Skip to content

Commit

Permalink
Jason work to fix worker priority scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
zef committed Dec 12, 2024
1 parent 9bcc931 commit aa10384
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class SchedulerDbService extends DbService implements InitializingBean {
public static final int PROCESSES_PAGE_SIZE = 100;

public static final String FIND_CLAIMABLE_ROWS_SQL =
"SELECT uuid FROM cws_sched_worker_proc_inst " +
"SELECT uuid, priority FROM cws_sched_worker_proc_inst " +
"WHERE " +
" status='"+PENDING+"' AND " +
" proc_def_key=? " +
Expand Down Expand Up @@ -256,10 +256,10 @@ public int updateProcInstIdAndStartedByWorker(
public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, Map<String,Integer> workerProcsList, Map<String,Integer> limitsPerProcs, int limit) {
List<String> claimUuids = new ArrayList<String>();
List<String> rowUuids = new ArrayList<String>();
List<String> rowUuidsPerProcDefKey = new ArrayList<String>();
List<Map<String, Object>> rowUuidsPerProcDefKey = new ArrayList<Map<String, Object>>();
LinkedHashMap<String, String> uuidAndProcDefKeyPair = new LinkedHashMap<String, String>();
List<String> clearOutUnclaimedInst = new ArrayList<String>();
List<String> unfilteredRowUuids = new ArrayList<String>();
List<Map<String, Object>> unfilteredProcesses = new ArrayList<Map<String, Object>>();
List<String> claimedRowUuids = new ArrayList<String>();
long t0 = System.currentTimeMillis();
int numClaimed = 0;
Expand All @@ -273,16 +273,21 @@ public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, Ma
// Find claimable rows
//
for (Map.Entry<String, Integer> procs : limitsPerProcs.entrySet()) {
rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, String.class,
new Object[] {procs.getKey(), procs.getValue()*2});
rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, new Object[] {procs.getKey(), procs.getValue()*2});
// get list of uuids using array of procdefkeys IN (keys)
unfilteredRowUuids.addAll(rowUuidsPerProcDefKey);
unfilteredProcesses.addAll(rowUuidsPerProcDefKey);
}

unfilteredProcesses.sort(new Comparator<Map<String, Object>>() {
public int compare(Map<String, Object> one, Map<String, Object> two) {
return ((Integer) one.get("priority")).compareTo((Integer) two.get("priority"));
}
});

Collections.sort(unfilteredRowUuids);
for (String id : unfilteredRowUuids) {
String procDefKeyString = getProcDefKeyFromUuid(id);
uuidAndProcDefKeyPair.put(id, procDefKeyString);
for (Map<String, Object> proc : unfilteredProcesses) {
String uuid = (String) proc.get("uuid");
String procDefKeyString = getProcDefKeyFromUuid(uuid);
uuidAndProcDefKeyPair.put(uuid, procDefKeyString);
}

for (Map.Entry<String,Integer> procLimit : limitsPerProcs.entrySet()) {
Expand Down Expand Up @@ -1537,4 +1542,4 @@ public int retryFailedToStart(List<String> uuids) {

return jdbcTemplate.update(query);
}
}
}

0 comments on commit aa10384

Please sign in to comment.