Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #837 from Thingographist/master
Browse files Browse the repository at this point in the history
no-op-node for task-tagged-constraints
  • Loading branch information
lbradstreet authored Jan 22, 2018
2 parents 2530c59 + 81d41e5 commit 05135d9
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 180 deletions.
48 changes: 25 additions & 23 deletions src/onyx/scheduling/common_job_scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@
(defn assign-coordinators [{:keys [coordinators allocations] :as replica}]
(reduce (fn [r job-id]
(let [job-peers (set (common/replica->job-peers replica job-id))
curr-coordinator (get-in r [:coordinators job-id])]
(if (get job-peers curr-coordinator)
curr-coordinator (get-in r [:coordinators job-id])]
(if (get job-peers curr-coordinator)
r
(let [candidate (-> job-peers sort first)]
(assoc-in r [:coordinators job-id] candidate)))))
(assoc-in r [:coordinators job-id] candidate)))))
replica
(keys allocations)))

Expand Down Expand Up @@ -215,7 +215,7 @@
[]
task-seq))

(defn to-node-array ^"[Lorg.btrplace.model.Node;"
(defn to-node-array ^"[Lorg.btrplace.model.Node;"
[nodes]
(into-array Node nodes))

Expand Down Expand Up @@ -308,7 +308,7 @@
(let [prev-task-peers (get-in original-replica [:allocations job-id task-id])]
(if-not (some #{peer-id} prev-task-peers)
(do
(assert
(assert
(some #{peer-id} (get-in new-replica [:allocations job-id task-id])))
(update-peer-site result job-id task-id peer-id))
result))
Expand All @@ -329,10 +329,10 @@
(reduce-kv
(fn [result peer-id [job-id task-id]]
(let [prev-allocation (common/peer->allocated-job (:allocations original-replica) peer-id)]
(if (and (or (nil? task-id)
(if (and (or (nil? task-id)
(not (and (= (:job prev-allocation) job-id)
(= (:task prev-allocation) task-id))))
(get (:task-slot-ids new-replica) (:job prev-allocation)))
(get (:task-slot-ids new-replica) (:job prev-allocation)))
(update-in result [:task-slot-ids (:job prev-allocation) (:task prev-allocation)] dissoc peer-id)
result)))
new-replica
Expand Down Expand Up @@ -376,15 +376,17 @@
task-seq))
0))

(defn task-tagged-constraints [replica peers peer->vm task->node jobs]
(defn task-tagged-constraints [replica peers peer->vm task->node jobs no-op-node]
(let [utasks (unconstrained-tasks replica jobs)]
(map
(fn [peer]
(let [peer-tags (get-in replica [:peer-tags peer])
ctasks (constrainted-tasks-for-peer replica jobs peer-tags)]
(Among.
[(peer->vm peer)]
[(map task->node ctasks)
[(if (not-empty ctasks)
(conj (map task->node ctasks) no-op-node)
ctasks)
(map task->node utasks)])))
(filter #(seq (get-in replica [:peer-tags %])) peers))))

Expand Down Expand Up @@ -416,7 +418,7 @@
(reduce
into
[(capacity-constraints replica job-utilization task-seq task->node capacities)
(task-tagged-constraints replica (:peers replica) peer->vm task->node jobs)
(task-tagged-constraints replica (:peers replica) peer->vm task->node jobs no-op-node)
(no-tagged-peers-constraints replica (:peers replica) peer->vm task->node jobs no-op-node)
(peer-running-constraints peer->vm)
(grouping-task-constraints replica task-seq task->node peer->vm)
Expand Down Expand Up @@ -445,8 +447,8 @@
{}
jobs))

(defn add-allocation-versions
"Adds version numbers to jobs whenever an allocation changes for that job.
(defn add-allocation-versions
"Adds version numbers to jobs whenever an allocation changes for that job.
This gives a measure of validity of messages and barriers that transit through the system."
[new old]
(reduce (fn [replica job-id]
Expand Down Expand Up @@ -504,31 +506,31 @@
(->> allocations
;; flatten job-id / task-id
(mapcat (fn [[job-id task-peers]]
(map (fn [task-id] [job-id task-id])
(map (fn [task-id] [job-id task-id])
(keys task-peers))))
;; flatten job-id / task-id / slot-id
(mapcat (fn [[job-id task-id]]
(map (fn [slot-id]
[job-id task-id slot-id])
(mapcat (fn [[job-id task-id]]
(map (fn [slot-id]
[job-id task-id slot-id])
(slot-ids replica job-id task-id))))
;; flatten job-id / task-id / src-peer-id
(mapcat (fn [[job-id task-id slot-id]]
(mapcat (fn [[job-id task-id slot-id]]
(map (fn [src-peer-id]
[job-id task-id slot-id src-peer-id])
(src-peers replica job-id task-id))))
(map (fn [[job task slot [peer-type peer-id]]]
{:src-peer-type peer-type
:src-peer-id peer-id
:job-id job
:job-id job
:dst-task-id task
:msg-slot-id slot}))))

(defn add-messaging-short-ids
(defn add-messaging-short-ids
"Converts long form messaging identifies into short int identifiers, for
quick comparison and reduced message size."
quick comparison and reduced message size."
[replica]
(assoc replica
:message-short-ids
(assoc replica
:message-short-ids
(->> replica
messaging-long-form
(set)
Expand All @@ -544,7 +546,7 @@
(add-allocation-versions old)
(add-messaging-short-ids))]
(run!
(fn [f]
(fn [f]
(assert (f updated) {:before new
:after updated}))
[;invariants/version-invariant
Expand Down
Loading

0 comments on commit 05135d9

Please sign in to comment.