Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

no-op-node for task-tagged-constraints #837

Merged
merged 1 commit into from
Jan 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions src/onyx/scheduling/common_job_scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@
(defn assign-coordinators [{:keys [coordinators allocations] :as replica}]
(reduce (fn [r job-id]
(let [job-peers (set (common/replica->job-peers replica job-id))
curr-coordinator (get-in r [:coordinators job-id])]
(if (get job-peers curr-coordinator)
curr-coordinator (get-in r [:coordinators job-id])]
(if (get job-peers curr-coordinator)
r
(let [candidate (-> job-peers sort first)]
(assoc-in r [:coordinators job-id] candidate)))))
(assoc-in r [:coordinators job-id] candidate)))))
replica
(keys allocations)))

Expand Down Expand Up @@ -215,7 +215,7 @@
[]
task-seq))

(defn to-node-array ^"[Lorg.btrplace.model.Node;"
(defn to-node-array ^"[Lorg.btrplace.model.Node;"
[nodes]
(into-array Node nodes))

Expand Down Expand Up @@ -308,7 +308,7 @@
(let [prev-task-peers (get-in original-replica [:allocations job-id task-id])]
(if-not (some #{peer-id} prev-task-peers)
(do
(assert
(assert
(some #{peer-id} (get-in new-replica [:allocations job-id task-id])))
(update-peer-site result job-id task-id peer-id))
result))
Expand All @@ -329,10 +329,10 @@
(reduce-kv
(fn [result peer-id [job-id task-id]]
(let [prev-allocation (common/peer->allocated-job (:allocations original-replica) peer-id)]
(if (and (or (nil? task-id)
(if (and (or (nil? task-id)
(not (and (= (:job prev-allocation) job-id)
(= (:task prev-allocation) task-id))))
(get (:task-slot-ids new-replica) (:job prev-allocation)))
(get (:task-slot-ids new-replica) (:job prev-allocation)))
(update-in result [:task-slot-ids (:job prev-allocation) (:task prev-allocation)] dissoc peer-id)
result)))
new-replica
Expand Down Expand Up @@ -376,15 +376,17 @@
task-seq))
0))

(defn task-tagged-constraints [replica peers peer->vm task->node jobs]
(defn task-tagged-constraints [replica peers peer->vm task->node jobs no-op-node]
(let [utasks (unconstrained-tasks replica jobs)]
(map
(fn [peer]
(let [peer-tags (get-in replica [:peer-tags peer])
ctasks (constrainted-tasks-for-peer replica jobs peer-tags)]
(Among.
[(peer->vm peer)]
[(map task->node ctasks)
[(if (not-empty ctasks)
(conj (map task->node ctasks) no-op-node)
ctasks)
(map task->node utasks)])))
(filter #(seq (get-in replica [:peer-tags %])) peers))))

Expand Down Expand Up @@ -416,7 +418,7 @@
(reduce
into
[(capacity-constraints replica job-utilization task-seq task->node capacities)
(task-tagged-constraints replica (:peers replica) peer->vm task->node jobs)
(task-tagged-constraints replica (:peers replica) peer->vm task->node jobs no-op-node)
(no-tagged-peers-constraints replica (:peers replica) peer->vm task->node jobs no-op-node)
(peer-running-constraints peer->vm)
(grouping-task-constraints replica task-seq task->node peer->vm)
Expand Down Expand Up @@ -445,8 +447,8 @@
{}
jobs))

(defn add-allocation-versions
"Adds version numbers to jobs whenever an allocation changes for that job.
(defn add-allocation-versions
"Adds version numbers to jobs whenever an allocation changes for that job.
This gives a measure of validity of messages and barriers that transit through the system."
[new old]
(reduce (fn [replica job-id]
Expand Down Expand Up @@ -504,31 +506,31 @@
(->> allocations
;; flatten job-id / task-id
(mapcat (fn [[job-id task-peers]]
(map (fn [task-id] [job-id task-id])
(map (fn [task-id] [job-id task-id])
(keys task-peers))))
;; flatten job-id / task-id / slot-id
(mapcat (fn [[job-id task-id]]
(map (fn [slot-id]
[job-id task-id slot-id])
(mapcat (fn [[job-id task-id]]
(map (fn [slot-id]
[job-id task-id slot-id])
(slot-ids replica job-id task-id))))
;; flatten job-id / task-id / src-peer-id
(mapcat (fn [[job-id task-id slot-id]]
(mapcat (fn [[job-id task-id slot-id]]
(map (fn [src-peer-id]
[job-id task-id slot-id src-peer-id])
(src-peers replica job-id task-id))))
(map (fn [[job task slot [peer-type peer-id]]]
{:src-peer-type peer-type
:src-peer-id peer-id
:job-id job
:job-id job
:dst-task-id task
:msg-slot-id slot}))))

(defn add-messaging-short-ids
(defn add-messaging-short-ids
"Converts long form messaging identifies into short int identifiers, for
quick comparison and reduced message size."
quick comparison and reduced message size."
[replica]
(assoc replica
:message-short-ids
(assoc replica
:message-short-ids
(->> replica
messaging-long-form
(set)
Expand All @@ -544,7 +546,7 @@
(add-allocation-versions old)
(add-messaging-short-ids))]
(run!
(fn [f]
(fn [f]
(assert (f updated) {:before new
:after updated}))
[;invariants/version-invariant
Expand Down
Loading