Skip to content

Commit

Permalink
IDS-9547: Max Processes Per Worker (#126)
Browse files Browse the repository at this point in the history
* add max process per worker config

* modify cws_worker in core db

* modify core sql

* rename new proc variable worker_max_num_running_procs

* MaxNumForProcsOnWorker variable for current Running processes

* remove added totalNumOfProcDefKeysOnWorker var

* clean up workerservice commit

* set cap on queryLimit for claiming a process start

* rename variables

* add upgrade script for DB

* working but needs clean and optimization

* clean up logs

* clean up CWSInstaller

* worker_max_num_running_procs changed to default 16

* variable description update

* removed limitProcDefKey condition in claimWithCounter

* set limit param to null. All enabled procs have opporunity to be claimed. update README

* claim query
  • Loading branch information
voxparcxls authored Feb 9, 2023
1 parent 9450bb8 commit f7f03c3
Show file tree
Hide file tree
Showing 14 changed files with 232 additions and 74 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,15 @@ ES_PASSWORD="na"
# Num of workers to start. 1 is the minimum.
NUM_WORKERS=1
# Default value is 16. 1 is the mininum.
WORKER_MAX_NUM_RUNNING_PROCS=16
# Default value is 1. Specifies the number of days (int) until the
# abandoned workers in the cws_workers database table are cleaned out.
WORKER_ABANDONED_DAYS=1
# Run the dev script
./dev.sh `pwd` ${USER} ${DB_TYPE} ${DB_HOST} ${DB_PORT} ${DB_NAME} ${DB_USER} ${DB_PASS} ${ES_PROTOCOL} ${ES_HOST} ${ES_PORT} ${ES_USE_AUTH} ${ES_USERNAME} ${ES_PASSWORD} ${CLOUD} ${SECURITY} ${HOSTNAME} ${EMAIL_LIST} ${ADMIN_FIRST} ${ADMIN_LAST} ${ADMIN_EMAIL} ${NUM_WORKERS} ${WORKER_ABANDONED_DAYS}
./dev.sh `pwd` ${USER} ${DB_TYPE} ${DB_HOST} ${DB_PORT} ${DB_NAME} ${DB_USER} ${DB_PASS} ${ES_PROTOCOL} ${ES_HOST} ${ES_PORT} ${ES_USE_AUTH} ${ES_USERNAME} ${ES_PASSWORD} ${CLOUD} ${SECURITY} ${HOSTNAME} ${EMAIL_LIST} ${ADMIN_FIRST} ${ADMIN_LAST} ${ADMIN_EMAIL} ${NUM_WORKERS} ${WORKER_MAX_NUM_RUNNING_PROCS} ${WORKER_ABANDONED_DAYS}
```

###### Run Personal Dev Script
Expand Down
89 changes: 73 additions & 16 deletions cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public class SchedulerDbService extends DbService implements InitializingBean {

public static final int DEFAULT_WORKER_PROC_DEF_MAX_INSTANCES = 1;
public static final int PROCESSES_PAGE_SIZE = 100;

// KEY FOR THIS IS: KEY `claimKey` (`status`,`proc_def_key`,`priority`,`created_time`)

public static final String FIND_CLAIMABLE_ROWS_SQL =
"SELECT uuid FROM cws_sched_worker_proc_inst " +
"WHERE " +
Expand Down Expand Up @@ -243,17 +242,24 @@ public int updateProcInstIdAndStartedByWorker(
return numUpdated;
}





/**
* Attempt to claim a process start request in the database.
*
* @param procDefKey -- only attempt to claim rows for this process definition
* @param workerProcsList -- attempts to claim rows for the active set of process definition(s)
* @return mappings of claimUuids and claimedRowUuids
*
*/
public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, String procDefKey, int limit) {

public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, Map<String,Integer> workerProcsList, Map<String,Integer> limitsPerProcs, int limit) {
List<String> claimUuids = new ArrayList<String>();
List<String> rowUuids = new ArrayList<String>();
List<String> rowUuidsPerProcDefKey = new ArrayList<String>();
LinkedHashMap<String, String> uuidAndProcDefKeyPair = new LinkedHashMap<String, String>();
List<String> clearOutUnclaimedInst = new ArrayList<String>();
List<String> unfilteredRowUuids = new ArrayList<String>();
List<String> claimedRowUuids = new ArrayList<String>();
long t0 = System.currentTimeMillis();
int numClaimed = 0;
Expand All @@ -266,11 +272,45 @@ public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, St
try {
// Find claimable rows
//
rowUuids = jdbcTemplate.queryForList(
FIND_CLAIMABLE_ROWS_SQL, String.class,
new Object[] {procDefKey,
limit*2}); // over-find because some workers might compete with this set
for (Map.Entry<String, Integer> procs : limitsPerProcs.entrySet()) {
rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, String.class,
new Object[] {procs.getKey(), procs.getValue()*2});
// get list of uuids using array of procdefkeys IN (keys)
unfilteredRowUuids.addAll(rowUuidsPerProcDefKey);
}

Collections.sort(unfilteredRowUuids);
for (String id : unfilteredRowUuids) {
String procDefKeyString = getProcDefKeyFromUuid(id);
uuidAndProcDefKeyPair.put(id, procDefKeyString);
}

for (Map.Entry<String,Integer> procLimit : limitsPerProcs.entrySet()) {
Set<String> keys = uuidAndProcDefKeyPair.keySet();
int applyPerProcsCap = 0;
for (String key : keys) {

if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) {
applyPerProcsCap = applyPerProcsCap + 1;
if (applyPerProcsCap > procLimit.getValue()) {
clearOutUnclaimedInst.add(key);
}
}
}
}

for (String removeUuidFromList : clearOutUnclaimedInst) {
uuidAndProcDefKeyPair.remove(removeUuidFromList);
}

Set<String> uuidKeys = uuidAndProcDefKeyPair.keySet();
// after its filtered add the uuids to rowUuids arraylist
for (String key : uuidKeys) {
rowUuids.add(key);
}

// make query that uses multi limit per ProcDefkey (JOIN)
// iterate to grab 30
if (!rowUuids.isEmpty()) {
// Found some claimable rows, so now try to claim them..
//
Expand All @@ -283,7 +323,7 @@ public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, St
numClaimed++;
claimUuids.add(claimUuid);
claimedRowUuids.add(uuid);
log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKey + "'");
//log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'");
}

if (numClaimed == limit) {
Expand All @@ -293,12 +333,12 @@ public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, St

if (numClaimed == 0) {
// other workers beat us to claiming the rows
log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKey '" + procDefKey + "', but claimed none! " +
log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " +
(attempts < 10 ? "Retrying..." : "GIVING UP!"));
continue; // retry finding claimable rows
}
else {
log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKey '" + procDefKey + "'");
log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'");
}
}
else if (log.isTraceEnabled()) {
Expand Down Expand Up @@ -335,11 +375,11 @@ else if (log.isTraceEnabled()) {
if (numClaimed != claimUuids.size()) {
log.error("numUpdated != claimUuids.size()" );
}

Map<String,List<String>> ret = new HashMap<String,List<String>>();
ret.put("claimUuids", claimUuids);
ret.put("claimedRowUuids", claimedRowUuids);

return ret;
}

Expand All @@ -356,8 +396,25 @@ public String getProcInstRowStatus(String uuid) {
return null;
}
}



public int getMaxProcsValueForWorker(String workerId) {
return jdbcTemplate.queryForObject(
"SELECT max_num_running_procs FROM cws_worker WHERE id=?",
new Object[] {workerId}, Integer.class);
}

public int getCountForClaimedProcInstPerKey(String procDefKey, List<String> claimedUuids) {
String listOfClaimUuid = "\"" + String.join("\", \"", claimedUuids) + "\"" ;
String query = "SELECT count(*) FROM cws_sched_worker_proc_inst " + "WHERE proc_def_key='" + procDefKey + "' " + "AND claim_uuid IN (" + listOfClaimUuid + ")";
return jdbcTemplate.queryForObject(query, Integer.class);
}


public String getProcDefKeyFromUuid(String uuid) {
String query = "SELECT proc_def_key FROM cws_sched_worker_proc_inst " + "WHERE uuid='" + uuid + "'";
return jdbcTemplate.queryForObject(query, String.class);
}

public Map<String,Object> getProcInstRow(String uuid) {
List<Map<String,Object>> list = jdbcTemplate.queryForList(
"SELECT * FROM cws_sched_worker_proc_inst " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class EngineDbService extends DbService implements InitializingBean {
@Value("${cws.install.type}") private String cwsInstallType;
@Value("${cws.worker.type}") private String cwsWorkerType;
@Value("${camunda.executor.service.max.pool.size}") private int maxExecutorServicePoolSize;
@Value("${worker.max.num.running.procs}") private int workerMaxNumRunningProcs;

private Logger log;

Expand Down Expand Up @@ -180,8 +181,8 @@ public void createOrUpdateWorkerRow(String lockOwner) {
numUpdated = jdbcTemplate.update(
"INSERT INTO cws_worker" +
" (id, lock_owner, name, install_directory, cws_install_type, cws_worker_type, " +
" status, job_executor_max_pool_size, created_time, last_heartbeat_time) " +
"VALUES (?,?,?,?,?,?,?,?,?,?)",
" status, job_executor_max_pool_size, max_num_running_procs, created_time, last_heartbeat_time) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?)",
new Object[]{
workerId,
lockOwner,
Expand All @@ -191,6 +192,7 @@ public void createOrUpdateWorkerRow(String lockOwner) {
cwsWorkerType,
null, // status will be changed to null once the worker is fully initialized
maxExecutorServicePoolSize, // changeable later via the UI..
workerMaxNumRunningProcs,
tsNow, // created_time
tsNow // last_heartbeat_time
});
Expand Down
124 changes: 80 additions & 44 deletions cws-engine-service/src/main/java/jpl/cws/engine/WorkerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public class WorkerService implements InitializingBean {
@Value("${cws.engine.jobexecutor.enabled}") private boolean jobExecutorEnabled;

@Value("${cws.tomcat.lib}") private String cwsTomcatLib;


@Value("${worker.max.num.running.procs}") private int workerMaxNumRunningProcs;

private Logger log;

public static AtomicInteger processorExecuteCount = new AtomicInteger(0);
Expand All @@ -84,8 +86,9 @@ public class WorkerService implements InitializingBean {

// Map of procDefKey and count of active process instances
public static Map<String,Integer> processCounters = new HashMap<String,Integer>();

private static Map<String,Integer> workerMaxProcInstances = new HashMap<String,Integer>();

private static Set<String> procStartReqUuidStartedThisWorker = new HashSet<String>();
private static Set<String> acceptingProcDefKeys = new HashSet<String>();
//private static Set<String> runningToCompleteTransitionUuids = new HashSet<String>();
Expand Down Expand Up @@ -185,7 +188,7 @@ public void initProcessCountersAndLimits() {

}

log.debug("AFTER INIT: limits: " + workerMaxProcInstances + ", counts: " + processCounters);
log.info("AFTER INIT: limits: " + workerMaxProcInstances + ", counts: " + processCounters);
}


Expand Down Expand Up @@ -312,7 +315,7 @@ public boolean updateProcessCountersAndLimits() {
//
String postConfig = "limits: " + workerMaxProcInstances + ", counts: " + processCounters;
if (lastProcCounterStatusMsg == null || !lastProcCounterStatusMsg.equals(postConfig)) {
log.debug("NEW: " + postConfig + ", OLD: " + lastProcCounterStatusMsg);
log.info("NEW: " + postConfig + ", OLD: " + lastProcCounterStatusMsg);
lastProcCounterStatusMsg = postConfig;
return true; // config changed
}
Expand Down Expand Up @@ -670,59 +673,92 @@ public List<Map<String,Object>> claimWithCounter(String limitToProcDefKey) {

synchronized (procStateLock) { // procCountsLock
t1 = System.currentTimeMillis();

int procSetSize = 0;
//int totalCurrentRunningProcsOnWorker = 0;
Map<String,Integer> currentCounts = new HashMap<String,Integer>();
Map<String,Integer> remainders = new HashMap<String,Integer>();
Map<String,Integer> queryLimitForProcSet = new HashMap<String,Integer>();
Map<String,Integer> limitToProcDefKeyObject = new HashMap<String,Integer>();

for (Entry<String,Integer> procMax : workerMaxProcInstances.entrySet()) {
String procDefKey = procMax.getKey();
if (limitToProcDefKey != null && !limitToProcDefKey.equals(procDefKey)) {
continue;
}

int procMaxNumber = procMax.getValue();
if (!acceptingProcDefKeys.contains(procDefKey)) {
//log.debug("skipping " + procDefKey + " BECAUSE IT NOT ACCEPTING RIGHT NOW!!!!");
continue;
}


currentCounts.put(procDefKey, processCounters.get(procDefKey));
remainders.put(procDefKey, procMaxNumber - currentCounts.get(procDefKey));
queryLimitForProcSet.put(procDefKey, Math.min(EXEC_SERVICE_MAX_POOL_SIZE, remainders.get(procDefKey)));

//log.trace("getting currentCount for procDefKey " + procDefKey);
int currentCount = processCounters.get(procDefKey);
//int currentCount = processCounters.get(procDefKey);
//log.trace("currentCount for " + procDefKey + " is " + currentCount);
int remainder = procMaxNumber - currentCount;
//int remainder = procMaxNumber - currentCount;
//log.trace("remainder for " + procDefKey + " is " + remainder);
int queryLimit = Math.min(EXEC_SERVICE_MAX_POOL_SIZE, remainder);
//int queryLimit = Math.min(EXEC_SERVICE_MAX_POOL_SIZE, remainder); // FIXME: needs revisit for proper min
//log.trace("queryLimit for " + procDefKey + " is " + queryLimit);

if (remainder > 0) {
// claim for remainder (marks DB rows as "claimedByWorker")
Map<String,List<String>> claimRowData =
schedulerDbService.claimHighestPriorityStartReq(
workerId, procDefKey, queryLimit);

List<String> claimed = claimRowData.get("claimUuids");

if (!claimed.isEmpty()) {
// increment counter by amount that was actually claimed
// in anticipation that the start will actually work.
// If the start turns out not to later worker, then this count will be decremented at that time.
//
processCounters.put(procDefKey, processCounters.get(procDefKey) + claimed.size());
// update uuid list
procStartReqUuidStartedThisWorker.addAll(claimRowData.get("claimedRowUuids"));
//log.debug("procStartReqUuidStartedThisWorker = " + procStartReqUuidStartedThisWorker);

log.debug("(CLAIMED " + claimed.size() + " / " + queryLimit + ", maxProcs=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")");

claimUuids.addAll(claimed);

} // end for loop

int totalCurrentRunningProcsOnWorker = 0;
for (Entry<String,Integer> entry : processCounters.entrySet()) {
totalCurrentRunningProcsOnWorker += entry.getValue().intValue();
}

// rename to workerMaxProcQueryLimit
int MaxNumForProcsOnWorker = schedulerDbService.getMaxProcsValueForWorker(workerId);
// this is for all procDefs cap
int workerMaxProcQueryLimit = MaxNumForProcsOnWorker - totalCurrentRunningProcsOnWorker;

int remaindersTotal = 0;
for (int r: remainders.values()) {
remaindersTotal += r;
}

if (remaindersTotal > 0 && workerMaxProcQueryLimit > 0) {
// claim for remainder (marks DB rows as "claimedByWorker")

int queryLimit = Math.min(MaxNumForProcsOnWorker, workerMaxProcQueryLimit);

Map<String,List<String>> claimRowData =
schedulerDbService.claimHighestPriorityStartReq(
workerId, currentCounts, queryLimitForProcSet, queryLimit); // pass list of procDefkey and a map of queryLimit per procDefKey

List<String> claimed = claimRowData.get("claimUuids");

if (!claimed.isEmpty()) {
// increment counter by amount that was actually claimed
// in anticipation that the start will actually work.
// If the start turns out not to later worker, then this count will be decremented at that time.
//
for (Map.Entry<String,Integer> procDefKey : processCounters.entrySet()) {
int claimedInstCount = schedulerDbService.getCountForClaimedProcInstPerKey(procDefKey.getKey(), claimed);
processCounters.put(procDefKey.getKey(), processCounters.get(procDefKey.getKey()) + claimedInstCount);
}
//else {
// log.debug("NONE CLAIMED (queryLimit=" + queryLimit + ", max=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")");
//}
}
else {
log.debug("[" + procDefKey + "] remainder <= 0, so not attempting claim. " +
"(remainder = " + remainder +
", procMaxNumber = " + procMaxNumber +
", currentCount = " + currentCount + ")");

// update uuid list
procStartReqUuidStartedThisWorker.addAll(claimRowData.get("claimedRowUuids"));
//log.debug("procStartReqUuidStartedThisWorker = " + procStartReqUuidStartedThisWorker);

log.debug("(CLAIMED " + claimed.size() + " / " + queryLimit + ", maxProcs=" + workerMaxProcInstances.entrySet() + ") for procDefKeys '" + workerMaxProcInstances.keySet() + "' (limitToProcDefKey="+limitToProcDefKey+")" + ", workerMaxNumRunningProcs=" + MaxNumForProcsOnWorker);
claimUuids.addAll(claimed);
}

} // end for loop
//else {
// log.debug("NONE CLAIMED (queryLimit=" + queryLimit + ", max=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")");
//}
}
else {
log.debug("Remainder for Worker Max Process Limit [" + workerMaxProcQueryLimit + "] workerMaxProcQueryLimit <= 0 OR Total of remainders [" + remaindersTotal + "] is <=0, so not attempting claim. " +
"(remainders = " + remainders +
", procMaxNumbers = " + workerMaxProcInstances.entrySet() +
", currentCounts = " + currentCounts + ")");
}



} // release lock

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,15 @@ public void run() {
boolean endedOnThisWorker =
workerService.processEndedActions(procDefKey, uuid);
if (endedOnThisWorker) {
workerService.procStartReqAction(procDefKey, "processEndEventDetected message received");
workerService.procStartReqAction(null, "processEndEventDetected message received");
}
}
else if (eventType.equals("sync")) {
boolean processCounterStateChanged = workerService.syncCounters("received " + eventType + " message");

if (processCounterStateChanged) {
log.trace(eventType + " :: state changed");

// If the process counter state changed, then we potentially have more bandwidth to
// execute more processes of the type that was just completed/failed.
workerService.procStartReqAction(procDefKey, "sync message received");
workerService.procStartReqAction(null, "sync message received");
}
}
}
Expand Down
Loading

0 comments on commit f7f03c3

Please sign in to comment.