diff --git a/cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java b/cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java index 7dd9c353..448da666 100644 --- a/cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java +++ b/cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java @@ -48,15 +48,15 @@ public class SchedulerDbService extends DbService implements InitializingBean { public static final int DEFAULT_WORKER_PROC_DEF_MAX_INSTANCES = 1; public static final int PROCESSES_PAGE_SIZE = 50; - public static final String FIND_CLAIMABLE_ROWS_SQL = - "SELECT uuid FROM cws_sched_worker_proc_inst " + - "WHERE " + - " status='" + PENDING + "' AND " + - " proc_def_key=? " + - "ORDER BY " + - " priority ASC, " + // lower priorities favored - " created_time ASC " + // older dates (FIFO) favored - "LIMIT ?"; + public static final String FIND_CLAIMABLE_ROWS_SQL = + "SELECT uuid, priority FROM cws_sched_worker_proc_inst " + + "WHERE " + + " status='"+PENDING+"' AND " + + " proc_def_key=? " + + "ORDER BY " + + " priority ASC, " + // lower priorities favored + " created_time ASC " + // older dates (FIFO) favored + "LIMIT ?"; public static final String UPDATE_CLAIMABLE_ROW_SQL = "UPDATE cws_sched_worker_proc_inst " + @@ -242,161 +242,173 @@ public int updateProcInstIdAndStartedByWorker( /** - * Attempt to claim a process start request in the database. - * - * @param workerProcsList -- attempts to claim rows for the active set of process definition(s) - * @return mappings of claimUuids and claimedRowUuids - */ - - public Map> claimHighestPriorityStartReq(String workerId, Map workerProcsList, Map limitsPerProcs, int limit) { - List claimUuids = new ArrayList(); - List rowUuids = new ArrayList(); - List rowUuidsPerProcDefKey = new ArrayList(); - LinkedHashMap uuidAndProcDefKeyPair = new LinkedHashMap(); - List clearOutUnclaimedInst = new ArrayList(); - List unfilteredRowUuids = new ArrayList(); - List claimedRowUuids = new ArrayList(); - long t0 = System.currentTimeMillis(); - int numClaimed = 0; - String claimUuid = null; - int attempts = 0; - - // Try, until succeeding in claiming at least one row - // - while (attempts++ < 10) { - try { - // Find claimable rows - // - for (Map.Entry procs : limitsPerProcs.entrySet()) { - rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, String.class, - new Object[]{procs.getKey(), procs.getValue() * 2}); - // get list of uuids using array of procdefkeys IN (keys) - unfilteredRowUuids.addAll(rowUuidsPerProcDefKey); - } - - Collections.sort(unfilteredRowUuids); - for (String id : unfilteredRowUuids) { - String procDefKeyString = getProcDefKeyFromUuid(id); - uuidAndProcDefKeyPair.put(id, procDefKeyString); - } - - for (Map.Entry procLimit : limitsPerProcs.entrySet()) { - Set keys = uuidAndProcDefKeyPair.keySet(); - int applyPerProcsCap = 0; - for (String key : keys) { - - if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) { - applyPerProcsCap = applyPerProcsCap + 1; - if (applyPerProcsCap > procLimit.getValue()) { - clearOutUnclaimedInst.add(key); - } - } + * Attempt to claim a process start request in the database. + * + * @param workerProcsList -- attempts to claim rows for the active set of process definition(s) + * @return mappings of claimUuids and claimedRowUuids + * + */ + + public Map> claimHighestPriorityStartReq(String workerId, Map workerProcsList, Map limitsPerProcs, int limit) { + List claimUuids = new ArrayList(); + List rowUuids = new ArrayList(); + List> rowUuidsPerProcDefKey = new ArrayList>(); + LinkedHashMap uuidAndProcDefKeyPair = new LinkedHashMap(); + List clearOutUnclaimedInst = new ArrayList(); + List> unfilteredProcesses = new ArrayList>(); + List claimedRowUuids = new ArrayList(); + long t0 = System.currentTimeMillis(); + int numClaimed = 0; + String claimUuid = null; + int attempts = 0; + + // Try, until succeeding in claiming at least one row + // + while (attempts++ < 10) { + try { + // Find claimable rows + // + for (Map.Entry procs : limitsPerProcs.entrySet()) { + rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, new Object[] {procs.getKey(), procs.getValue()*2}); + // get list of uuids using array of procdefkeys IN (keys) + unfilteredProcesses.addAll(rowUuidsPerProcDefKey); + } + + unfilteredProcesses.sort(new Comparator>() { + public int compare(Map one, Map two) { + return ((Integer) one.get("priority")).compareTo((Integer) two.get("priority")); } - } - - for (String removeUuidFromList : clearOutUnclaimedInst) { - uuidAndProcDefKeyPair.remove(removeUuidFromList); - } - - Set uuidKeys = uuidAndProcDefKeyPair.keySet(); - // after its filtered add the uuids to rowUuids arraylist - for (String key : uuidKeys) { - rowUuids.add(key); - } - - // make query that uses multi limit per ProcDefkey (JOIN) - // iterate to grab 30 - if (!rowUuids.isEmpty()) { - // Found some claimable rows, so now try to claim them.. - // - for (String uuid : rowUuids) { - claimUuid = UUID.randomUUID().toString(); - int updateCount = jdbcTemplate.update(UPDATE_CLAIMABLE_ROW_SQL, - new Object[]{workerId, claimUuid, uuid, workerId}); - - if (updateCount == 1) { - numClaimed++; - claimUuids.add(claimUuid); - claimedRowUuids.add(uuid); - //log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'"); - } - - if (numClaimed == limit) { - break; // we have claimed up to the limit, so stop claiming - } - } - - if (numClaimed == 0) { - // other workers beat us to claiming the rows - log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " + - (attempts < 10 ? "Retrying..." : "GIVING UP!")); - continue; // retry finding claimable rows - } else { - log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'"); - } - } else if (log.isTraceEnabled()) { - log.trace("NO CLAIMABLE CANDIDATES AT THIS TIME"); - } - - break; // no retry needed - } catch (DeadlockLoserDataAccessException e) { - if (attempts == 10) { - log.error("Caught a DeadlockLoserDataAccessException. NOT Retrying as 10 attempts have been tried already!.."); - break; // give up - } - log.warn("Caught a DeadlockLoserDataAccessException. Retrying.."); - continue; // retry - } catch (Throwable t) { - log.error("Unexpected exception. Not retrying..", t); - break; // abort - } - } // end while (attempts) - - long timeTaken = System.currentTimeMillis() - t0; - if (timeTaken > SLOW_WARN_THRESHOLD) { - log.warn("CLAIM cws_sched_worker_proc_inst took " + timeTaken + " ms!"); - } - if (numClaimed >= 1) { - log.info("worker " + workerId + " claimed " + numClaimed + " row(s)."); - } else { - log.trace("no rows claimed by worker: " + workerId); - } - - if (numClaimed != claimUuids.size()) { - log.error("numUpdated != claimUuids.size()"); - } - - Map> ret = new HashMap>(); - ret.put("claimUuids", claimUuids); - ret.put("claimedRowUuids", claimedRowUuids); - - return ret; - } + }); + + for (Map proc : unfilteredProcesses) { + String uuid = (String) proc.get("uuid"); + String procDefKeyString = getProcDefKeyFromUuid(uuid); + uuidAndProcDefKeyPair.put(uuid, procDefKeyString); + } + + for (Map.Entry procLimit : limitsPerProcs.entrySet()) { + Set keys = uuidAndProcDefKeyPair.keySet(); + int applyPerProcsCap = 0; + for (String key : keys) { + + if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) { + applyPerProcsCap = applyPerProcsCap + 1; + if (applyPerProcsCap > procLimit.getValue()) { + clearOutUnclaimedInst.add(key); + } + } + } + } + + for (String removeUuidFromList : clearOutUnclaimedInst) { + uuidAndProcDefKeyPair.remove(removeUuidFromList); + } + + Set uuidKeys = uuidAndProcDefKeyPair.keySet(); + // after its filtered add the uuids to rowUuids arraylist + for (String key : uuidKeys) { + rowUuids.add(key); + } + + // make query that uses multi limit per ProcDefkey (JOIN) + // iterate to grab 30 + if (!rowUuids.isEmpty()) { + // Found some claimable rows, so now try to claim them.. + // + for (String uuid : rowUuids) { + claimUuid = UUID.randomUUID().toString(); + int updateCount = jdbcTemplate.update(UPDATE_CLAIMABLE_ROW_SQL, + new Object[] {workerId, claimUuid, uuid, workerId}); + + if (updateCount == 1) { + numClaimed++; + claimUuids.add(claimUuid); + claimedRowUuids.add(uuid); + //log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'"); + } + + if (numClaimed == limit) { + break; // we have claimed up to the limit, so stop claiming + } + } + + if (numClaimed == 0) { + // other workers beat us to claiming the rows + log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " + + (attempts < 10 ? "Retrying..." : "GIVING UP!")); + continue; // retry finding claimable rows + } + else { + log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'"); + } + } + else if (log.isTraceEnabled()) { + log.trace("NO CLAIMABLE CANDIDATES AT THIS TIME"); + } + + break; // no retry needed + } + catch (DeadlockLoserDataAccessException e) { + if (attempts == 10) { + log.error("Caught a DeadlockLoserDataAccessException. NOT Retrying as 10 attempts have been tried already!.."); + break; // give up + } + log.warn("Caught a DeadlockLoserDataAccessException. Retrying.."); + continue; // retry + } + catch (Throwable t) { + log.error("Unexpected exception. Not retrying..", t); + break; // abort + } + } // end while (attempts) + + long timeTaken = System.currentTimeMillis() - t0; + if (timeTaken > SLOW_WARN_THRESHOLD) { + log.warn("CLAIM cws_sched_worker_proc_inst took " + timeTaken + " ms!"); + } + if (numClaimed >= 1) { + log.info("worker " + workerId + " claimed " + numClaimed + " row(s)."); + } + else { + log.trace("no rows claimed by worker: " + workerId); + } + + if (numClaimed != claimUuids.size()) { + log.error("numUpdated != claimUuids.size()" ); + } + + Map> ret = new HashMap>(); + ret.put("claimUuids", claimUuids); + ret.put("claimedRowUuids", claimedRowUuids); + + return ret; + } public String getProcInstRowStatus(String uuid) { - List> list = jdbcTemplate.queryForList( - "SELECT status FROM cws_sched_worker_proc_inst " + - "WHERE uuid=?", - new Object[]{uuid}); - if (list != null && !list.isEmpty()) { - return list.iterator().next().values().iterator().next().toString(); - } else { - return null; - } - } + List> list = jdbcTemplate.queryForList( + "SELECT status FROM cws_sched_worker_proc_inst " + + "WHERE uuid=?", + new Object[] {uuid}); + if (list != null && !list.isEmpty()) { + return list.iterator().next().values().iterator().next().toString(); + } + else { + return null; + } + } public int getMaxProcsValueForWorker(String workerId) { - return jdbcTemplate.queryForObject( - "SELECT max_num_running_procs FROM cws_worker WHERE id=?", - new Object[]{workerId}, Integer.class); - } + return jdbcTemplate.queryForObject( + "SELECT max_num_running_procs FROM cws_worker WHERE id=?", + new Object[] {workerId}, Integer.class); + } public int getCountForClaimedProcInstPerKey(String procDefKey, List claimedUuids) { - String listOfClaimUuid = "\"" + String.join("\", \"", claimedUuids) + "\""; - String query = "SELECT count(*) FROM cws_sched_worker_proc_inst " + "WHERE proc_def_key='" + procDefKey + "' " + "AND claim_uuid IN (" + listOfClaimUuid + ")"; - return jdbcTemplate.queryForObject(query, Integer.class); - } + String listOfClaimUuid = "\"" + String.join("\", \"", claimedUuids) + "\"" ; + String query = "SELECT count(*) FROM cws_sched_worker_proc_inst " + "WHERE proc_def_key='" + procDefKey + "' " + "AND claim_uuid IN (" + listOfClaimUuid + ")"; + return jdbcTemplate.queryForObject(query, Integer.class); + } public List> getProcDefKeyLatestCompleteInst(String procDefKey) { return jdbcTemplate.queryForList( @@ -1121,6 +1133,7 @@ public List> getProcessInstanceStats(String lastNumHours) { List> camundaAndCwsStatuses = jdbcTemplate.queryForList(query, time, time, time, time); +<<<<<<< HEAD ret.addAll(camundaAndCwsStatuses); return ret; @@ -1628,3 +1641,8 @@ public int retryFailedToStart(List uuids) { return jdbcTemplate.update(query); } } +======= + return jdbcTemplate.update(query); + } +} +>>>>>>> aa10384 (Jason work to fix worker priority scheduling)