Skip to content

Commit

Permalink
Jason work to fix worker priority scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
zef committed Dec 17, 2024
1 parent 4014080 commit 29ad419
Showing 1 changed file with 174 additions and 156 deletions.
330 changes: 174 additions & 156 deletions cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public class SchedulerDbService extends DbService implements InitializingBean {
public static final int DEFAULT_WORKER_PROC_DEF_MAX_INSTANCES = 1;
public static final int PROCESSES_PAGE_SIZE = 50;

public static final String FIND_CLAIMABLE_ROWS_SQL =
"SELECT uuid FROM cws_sched_worker_proc_inst " +
"WHERE " +
" status='" + PENDING + "' AND " +
" proc_def_key=? " +
"ORDER BY " +
" priority ASC, " + // lower priorities favored
" created_time ASC " + // older dates (FIFO) favored
"LIMIT ?";
public static final String FIND_CLAIMABLE_ROWS_SQL =
"SELECT uuid, priority FROM cws_sched_worker_proc_inst " +
"WHERE " +
" status='"+PENDING+"' AND " +
" proc_def_key=? " +
"ORDER BY " +
" priority ASC, " + // lower priorities favored
" created_time ASC " + // older dates (FIFO) favored
"LIMIT ?";

public static final String UPDATE_CLAIMABLE_ROW_SQL =
"UPDATE cws_sched_worker_proc_inst " +
Expand Down Expand Up @@ -242,161 +242,173 @@ public int updateProcInstIdAndStartedByWorker(


/**
* Attempt to claim a process start request in the database.
*
* @param workerProcsList -- attempts to claim rows for the active set of process definition(s)
* @return mappings of claimUuids and claimedRowUuids
*/

public Map<String, List<String>> claimHighestPriorityStartReq(String workerId, Map<String, Integer> workerProcsList, Map<String, Integer> limitsPerProcs, int limit) {
List<String> claimUuids = new ArrayList<String>();
List<String> rowUuids = new ArrayList<String>();
List<String> rowUuidsPerProcDefKey = new ArrayList<String>();
LinkedHashMap<String, String> uuidAndProcDefKeyPair = new LinkedHashMap<String, String>();
List<String> clearOutUnclaimedInst = new ArrayList<String>();
List<String> unfilteredRowUuids = new ArrayList<String>();
List<String> claimedRowUuids = new ArrayList<String>();
long t0 = System.currentTimeMillis();
int numClaimed = 0;
String claimUuid = null;
int attempts = 0;

// Try, until succeeding in claiming at least one row
//
while (attempts++ < 10) {
try {
// Find claimable rows
//
for (Map.Entry<String, Integer> procs : limitsPerProcs.entrySet()) {
rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, String.class,
new Object[]{procs.getKey(), procs.getValue() * 2});
// get list of uuids using array of procdefkeys IN (keys)
unfilteredRowUuids.addAll(rowUuidsPerProcDefKey);
}

Collections.sort(unfilteredRowUuids);
for (String id : unfilteredRowUuids) {
String procDefKeyString = getProcDefKeyFromUuid(id);
uuidAndProcDefKeyPair.put(id, procDefKeyString);
}

for (Map.Entry<String, Integer> procLimit : limitsPerProcs.entrySet()) {
Set<String> keys = uuidAndProcDefKeyPair.keySet();
int applyPerProcsCap = 0;
for (String key : keys) {

if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) {
applyPerProcsCap = applyPerProcsCap + 1;
if (applyPerProcsCap > procLimit.getValue()) {
clearOutUnclaimedInst.add(key);
}
}
* Attempt to claim a process start request in the database.
*
* @param workerProcsList -- attempts to claim rows for the active set of process definition(s)
* @return mappings of claimUuids and claimedRowUuids
*
*/

public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, Map<String,Integer> workerProcsList, Map<String,Integer> limitsPerProcs, int limit) {
List<String> claimUuids = new ArrayList<String>();
List<String> rowUuids = new ArrayList<String>();
List<Map<String, Object>> rowUuidsPerProcDefKey = new ArrayList<Map<String, Object>>();
LinkedHashMap<String, String> uuidAndProcDefKeyPair = new LinkedHashMap<String, String>();
List<String> clearOutUnclaimedInst = new ArrayList<String>();
List<Map<String, Object>> unfilteredProcesses = new ArrayList<Map<String, Object>>();
List<String> claimedRowUuids = new ArrayList<String>();
long t0 = System.currentTimeMillis();
int numClaimed = 0;
String claimUuid = null;
int attempts = 0;

// Try, until succeeding in claiming at least one row
//
while (attempts++ < 10) {
try {
// Find claimable rows
//
for (Map.Entry<String, Integer> procs : limitsPerProcs.entrySet()) {
rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, new Object[] {procs.getKey(), procs.getValue()*2});
// get list of uuids using array of procdefkeys IN (keys)
unfilteredProcesses.addAll(rowUuidsPerProcDefKey);
}

unfilteredProcesses.sort(new Comparator<Map<String, Object>>() {
public int compare(Map<String, Object> one, Map<String, Object> two) {
return ((Integer) one.get("priority")).compareTo((Integer) two.get("priority"));
}
}

for (String removeUuidFromList : clearOutUnclaimedInst) {
uuidAndProcDefKeyPair.remove(removeUuidFromList);
}

Set<String> uuidKeys = uuidAndProcDefKeyPair.keySet();
// after its filtered add the uuids to rowUuids arraylist
for (String key : uuidKeys) {
rowUuids.add(key);
}

// make query that uses multi limit per ProcDefkey (JOIN)
// iterate to grab 30
if (!rowUuids.isEmpty()) {
// Found some claimable rows, so now try to claim them..
//
for (String uuid : rowUuids) {
claimUuid = UUID.randomUUID().toString();
int updateCount = jdbcTemplate.update(UPDATE_CLAIMABLE_ROW_SQL,
new Object[]{workerId, claimUuid, uuid, workerId});

if (updateCount == 1) {
numClaimed++;
claimUuids.add(claimUuid);
claimedRowUuids.add(uuid);
//log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'");
}

if (numClaimed == limit) {
break; // we have claimed up to the limit, so stop claiming
}
}

if (numClaimed == 0) {
// other workers beat us to claiming the rows
log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " +
(attempts < 10 ? "Retrying..." : "GIVING UP!"));
continue; // retry finding claimable rows
} else {
log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'");
}
} else if (log.isTraceEnabled()) {
log.trace("NO CLAIMABLE CANDIDATES AT THIS TIME");
}

break; // no retry needed
} catch (DeadlockLoserDataAccessException e) {
if (attempts == 10) {
log.error("Caught a DeadlockLoserDataAccessException. NOT Retrying as 10 attempts have been tried already!..");
break; // give up
}
log.warn("Caught a DeadlockLoserDataAccessException. Retrying..");
continue; // retry
} catch (Throwable t) {
log.error("Unexpected exception. Not retrying..", t);
break; // abort
}
} // end while (attempts)

long timeTaken = System.currentTimeMillis() - t0;
if (timeTaken > SLOW_WARN_THRESHOLD) {
log.warn("CLAIM cws_sched_worker_proc_inst took " + timeTaken + " ms!");
}
if (numClaimed >= 1) {
log.info("worker " + workerId + " claimed " + numClaimed + " row(s).");
} else {
log.trace("no rows claimed by worker: " + workerId);
}

if (numClaimed != claimUuids.size()) {
log.error("numUpdated != claimUuids.size()");
}

Map<String, List<String>> ret = new HashMap<String, List<String>>();
ret.put("claimUuids", claimUuids);
ret.put("claimedRowUuids", claimedRowUuids);

return ret;
}
});

for (Map<String, Object> proc : unfilteredProcesses) {
String uuid = (String) proc.get("uuid");
String procDefKeyString = getProcDefKeyFromUuid(uuid);
uuidAndProcDefKeyPair.put(uuid, procDefKeyString);
}

for (Map.Entry<String,Integer> procLimit : limitsPerProcs.entrySet()) {
Set<String> keys = uuidAndProcDefKeyPair.keySet();
int applyPerProcsCap = 0;
for (String key : keys) {

if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) {
applyPerProcsCap = applyPerProcsCap + 1;
if (applyPerProcsCap > procLimit.getValue()) {
clearOutUnclaimedInst.add(key);
}
}
}
}

for (String removeUuidFromList : clearOutUnclaimedInst) {
uuidAndProcDefKeyPair.remove(removeUuidFromList);
}

Set<String> uuidKeys = uuidAndProcDefKeyPair.keySet();
// after its filtered add the uuids to rowUuids arraylist
for (String key : uuidKeys) {
rowUuids.add(key);
}

// make query that uses multi limit per ProcDefkey (JOIN)
// iterate to grab 30
if (!rowUuids.isEmpty()) {
// Found some claimable rows, so now try to claim them..
//
for (String uuid : rowUuids) {
claimUuid = UUID.randomUUID().toString();
int updateCount = jdbcTemplate.update(UPDATE_CLAIMABLE_ROW_SQL,
new Object[] {workerId, claimUuid, uuid, workerId});

if (updateCount == 1) {
numClaimed++;
claimUuids.add(claimUuid);
claimedRowUuids.add(uuid);
//log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'");
}

if (numClaimed == limit) {
break; // we have claimed up to the limit, so stop claiming
}
}

if (numClaimed == 0) {
// other workers beat us to claiming the rows
log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " +
(attempts < 10 ? "Retrying..." : "GIVING UP!"));
continue; // retry finding claimable rows
}
else {
log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'");
}
}
else if (log.isTraceEnabled()) {
log.trace("NO CLAIMABLE CANDIDATES AT THIS TIME");
}

break; // no retry needed
}
catch (DeadlockLoserDataAccessException e) {
if (attempts == 10) {
log.error("Caught a DeadlockLoserDataAccessException. NOT Retrying as 10 attempts have been tried already!..");
break; // give up
}
log.warn("Caught a DeadlockLoserDataAccessException. Retrying..");
continue; // retry
}
catch (Throwable t) {
log.error("Unexpected exception. Not retrying..", t);
break; // abort
}
} // end while (attempts)

long timeTaken = System.currentTimeMillis() - t0;
if (timeTaken > SLOW_WARN_THRESHOLD) {
log.warn("CLAIM cws_sched_worker_proc_inst took " + timeTaken + " ms!");
}
if (numClaimed >= 1) {
log.info("worker " + workerId + " claimed " + numClaimed + " row(s).");
}
else {
log.trace("no rows claimed by worker: " + workerId);
}

if (numClaimed != claimUuids.size()) {
log.error("numUpdated != claimUuids.size()" );
}

Map<String,List<String>> ret = new HashMap<String,List<String>>();
ret.put("claimUuids", claimUuids);
ret.put("claimedRowUuids", claimedRowUuids);

return ret;
}


public String getProcInstRowStatus(String uuid) {
List<Map<String, Object>> list = jdbcTemplate.queryForList(
"SELECT status FROM cws_sched_worker_proc_inst " +
"WHERE uuid=?",
new Object[]{uuid});
if (list != null && !list.isEmpty()) {
return list.iterator().next().values().iterator().next().toString();
} else {
return null;
}
}
List<Map<String,Object>> list = jdbcTemplate.queryForList(
"SELECT status FROM cws_sched_worker_proc_inst " +
"WHERE uuid=?",
new Object[] {uuid});
if (list != null && !list.isEmpty()) {
return list.iterator().next().values().iterator().next().toString();
}
else {
return null;
}
}

public int getMaxProcsValueForWorker(String workerId) {
return jdbcTemplate.queryForObject(
"SELECT max_num_running_procs FROM cws_worker WHERE id=?",
new Object[]{workerId}, Integer.class);
}
return jdbcTemplate.queryForObject(
"SELECT max_num_running_procs FROM cws_worker WHERE id=?",
new Object[] {workerId}, Integer.class);
}

public int getCountForClaimedProcInstPerKey(String procDefKey, List<String> claimedUuids) {
String listOfClaimUuid = "\"" + String.join("\", \"", claimedUuids) + "\"";
String query = "SELECT count(*) FROM cws_sched_worker_proc_inst " + "WHERE proc_def_key='" + procDefKey + "' " + "AND claim_uuid IN (" + listOfClaimUuid + ")";
return jdbcTemplate.queryForObject(query, Integer.class);
}
String listOfClaimUuid = "\"" + String.join("\", \"", claimedUuids) + "\"" ;
String query = "SELECT count(*) FROM cws_sched_worker_proc_inst " + "WHERE proc_def_key='" + procDefKey + "' " + "AND claim_uuid IN (" + listOfClaimUuid + ")";
return jdbcTemplate.queryForObject(query, Integer.class);
}

public List<Map<String, Object>> getProcDefKeyLatestCompleteInst(String procDefKey) {
return jdbcTemplate.queryForList(
Expand Down Expand Up @@ -1121,6 +1133,7 @@ public List<Map<String, Object>> getProcessInstanceStats(String lastNumHours) {

List<Map<String, Object>> camundaAndCwsStatuses = jdbcTemplate.queryForList(query, time, time, time, time);

<<<<<<< HEAD
ret.addAll(camundaAndCwsStatuses);

return ret;
Expand Down Expand Up @@ -1628,3 +1641,8 @@ public int retryFailedToStart(List<String> uuids) {
return jdbcTemplate.update(query);
}
}
=======
return jdbcTemplate.update(query);
}
}
>>>>>>> aa10384 (Jason work to fix worker priority scheduling)

0 comments on commit 29ad419

Please sign in to comment.