From 81d41e58dc2d5870a7939bb68a4e0daa103310e6 Mon Sep 17 00:00:00 2001 From: khoda Date: Thu, 4 Jan 2018 13:34:40 +0700 Subject: [PATCH] no-op-node for task-tagged-constraints --- src/onyx/scheduling/common_job_scheduler.clj | 48 +-- .../scheduler/tagged_constraints_test.clj | 383 +++++++++++------- 2 files changed, 251 insertions(+), 180 deletions(-) diff --git a/src/onyx/scheduling/common_job_scheduler.clj b/src/onyx/scheduling/common_job_scheduler.clj index 9ae6b8104..34ee3bc90 100644 --- a/src/onyx/scheduling/common_job_scheduler.clj +++ b/src/onyx/scheduling/common_job_scheduler.clj @@ -109,11 +109,11 @@ (defn assign-coordinators [{:keys [coordinators allocations] :as replica}] (reduce (fn [r job-id] (let [job-peers (set (common/replica->job-peers replica job-id)) - curr-coordinator (get-in r [:coordinators job-id])] - (if (get job-peers curr-coordinator) + curr-coordinator (get-in r [:coordinators job-id])] + (if (get job-peers curr-coordinator) r (let [candidate (-> job-peers sort first)] - (assoc-in r [:coordinators job-id] candidate))))) + (assoc-in r [:coordinators job-id] candidate))))) replica (keys allocations))) @@ -215,7 +215,7 @@ [] task-seq)) -(defn to-node-array ^"[Lorg.btrplace.model.Node;" +(defn to-node-array ^"[Lorg.btrplace.model.Node;" [nodes] (into-array Node nodes)) @@ -308,7 +308,7 @@ (let [prev-task-peers (get-in original-replica [:allocations job-id task-id])] (if-not (some #{peer-id} prev-task-peers) (do - (assert + (assert (some #{peer-id} (get-in new-replica [:allocations job-id task-id]))) (update-peer-site result job-id task-id peer-id)) result)) @@ -329,10 +329,10 @@ (reduce-kv (fn [result peer-id [job-id task-id]] (let [prev-allocation (common/peer->allocated-job (:allocations original-replica) peer-id)] - (if (and (or (nil? task-id) + (if (and (or (nil? task-id) (not (and (= (:job prev-allocation) job-id) (= (:task prev-allocation) task-id)))) - (get (:task-slot-ids new-replica) (:job prev-allocation))) + (get (:task-slot-ids new-replica) (:job prev-allocation))) (update-in result [:task-slot-ids (:job prev-allocation) (:task prev-allocation)] dissoc peer-id) result))) new-replica @@ -376,7 +376,7 @@ task-seq)) 0)) -(defn task-tagged-constraints [replica peers peer->vm task->node jobs] +(defn task-tagged-constraints [replica peers peer->vm task->node jobs no-op-node] (let [utasks (unconstrained-tasks replica jobs)] (map (fn [peer] @@ -384,7 +384,9 @@ ctasks (constrainted-tasks-for-peer replica jobs peer-tags)] (Among. [(peer->vm peer)] - [(map task->node ctasks) + [(if (not-empty ctasks) + (conj (map task->node ctasks) no-op-node) + ctasks) (map task->node utasks)]))) (filter #(seq (get-in replica [:peer-tags %])) peers)))) @@ -416,7 +418,7 @@ (reduce into [(capacity-constraints replica job-utilization task-seq task->node capacities) - (task-tagged-constraints replica (:peers replica) peer->vm task->node jobs) + (task-tagged-constraints replica (:peers replica) peer->vm task->node jobs no-op-node) (no-tagged-peers-constraints replica (:peers replica) peer->vm task->node jobs no-op-node) (peer-running-constraints peer->vm) (grouping-task-constraints replica task-seq task->node peer->vm) @@ -445,8 +447,8 @@ {} jobs)) -(defn add-allocation-versions - "Adds version numbers to jobs whenever an allocation changes for that job. +(defn add-allocation-versions + "Adds version numbers to jobs whenever an allocation changes for that job. This gives a measure of validity of messages and barriers that transit through the system." [new old] (reduce (fn [replica job-id] @@ -504,31 +506,31 @@ (->> allocations ;; flatten job-id / task-id (mapcat (fn [[job-id task-peers]] - (map (fn [task-id] [job-id task-id]) + (map (fn [task-id] [job-id task-id]) (keys task-peers)))) ;; flatten job-id / task-id / slot-id - (mapcat (fn [[job-id task-id]] - (map (fn [slot-id] - [job-id task-id slot-id]) + (mapcat (fn [[job-id task-id]] + (map (fn [slot-id] + [job-id task-id slot-id]) (slot-ids replica job-id task-id)))) ;; flatten job-id / task-id / src-peer-id - (mapcat (fn [[job-id task-id slot-id]] + (mapcat (fn [[job-id task-id slot-id]] (map (fn [src-peer-id] [job-id task-id slot-id src-peer-id]) (src-peers replica job-id task-id)))) (map (fn [[job task slot [peer-type peer-id]]] {:src-peer-type peer-type :src-peer-id peer-id - :job-id job + :job-id job :dst-task-id task :msg-slot-id slot})))) -(defn add-messaging-short-ids +(defn add-messaging-short-ids "Converts long form messaging identifies into short int identifiers, for - quick comparison and reduced message size." + quick comparison and reduced message size." [replica] - (assoc replica - :message-short-ids + (assoc replica + :message-short-ids (->> replica messaging-long-form (set) @@ -544,7 +546,7 @@ (add-allocation-versions old) (add-messaging-short-ids))] (run! - (fn [f] + (fn [f] (assert (f updated) {:before new :after updated})) [;invariants/version-invariant diff --git a/test/onyx/scheduler/tagged_constraints_test.clj b/test/onyx/scheduler/tagged_constraints_test.clj index e1f183d6c..a9eec4898 100644 --- a/test/onyx/scheduler/tagged_constraints_test.clj +++ b/test/onyx/scheduler/tagged_constraints_test.clj @@ -3,6 +3,7 @@ [clojure.test.check :as tc] [clojure.test.check.generators :as gen] [clojure.test.check.properties :as prop] + [clojure.data :refer [diff]] [com.gfredericks.test.chuck :refer [times]] [com.gfredericks.test.chuck.clojure-test :refer [checking]] [onyx.scheduling.common-job-scheduler :refer [reconfigure-cluster-workload]] @@ -11,167 +12,235 @@ [onyx.static.planning :as planning] [onyx.api])) +(def old-replica {:allocations {}}) + +(deftest diff-replicas-after-reconfigured + (let [new-replica (one-group + {:jobs [:j1] + :allocations {} + :peers [:p1 :p2 :p3] + :peer-state {:p1 :idle :p2 :idle :p3 :idle} + :tasks {:j1 [:t1 :t2 :t3]} + :task-schedulers {:j1 :onyx.task-scheduler/balanced + :j2 :onyx.task-scheduler/balanced} + :job-scheduler :onyx.job-scheduler/balanced + :required-tags {:j1 {:t1 [:datomic] + :t2 [:datomic] + :t3 [:datomic]}} + :peer-tags {:p1 [:datomic] + :p2 [:datomic] + :p3 [:datomic]} + :messaging {:onyx.messaging/impl :aeron}}) + reconfigured (reconfigure-cluster-workload new-replica old-replica)] + (is + (= '({:allocations nil} + {:allocations {:j1 {:t2 [:p1] + :t3 [:p2] + :t1 [:p3]}} + :peer-sites {:p1 {} :p2 {} :p3 {}} + :task-slot-ids {:j1 {:t2 {:p1 0} + :t3 {:p2 0} + :t1 {:p3 0}}} + :allocation-version {:j1 nil} + :coordinators {:j1 :p1} + :message-short-ids {}}) + (butlast (diff new-replica reconfigured)))))) + +(deftest peers-allocated-with-tags-if-peers-more-than-necessary + (is + (= {:j1 {:t1 [:p1] + :t2 [:p6] + :t3 [:p2]}} + (:allocations + (reconfigure-cluster-workload + (one-group + {:jobs [:j1] + :allocations {} + :peers [:p1 :p2 :p3 :p4 :p5 :p6] + :peer-state {:p1 :idle :p2 :idle :p3 :idle :p4 :idle :p5 :idle :p6 :idle} + :tasks {:j1 [:t1 :t2 :t3]} + :task-schedulers {:j1 :onyx.task-scheduler/balanced + :j2 :onyx.task-scheduler/balanced} + :job-scheduler :onyx.job-scheduler/balanced + :min-required-peers {:j1 {:t1 1 :t2 1 :t3 1}}, + :saturation {:j1 3} + :task-saturation {:j1 {:t1 1 :t2 1 :t3 1}} + :required-tags {:j1 {:t1 [:s3] + :t2 [:datomic] + :t3 [:datomic]}} + :peer-tags {:p1 [:s3] :p2 [:datomic] :p3 [:datomic] :p4 [:s3] :p5 [:s3] :p6 [:datomic]} + :messaging {:onyx.messaging/impl :aeron}}) + old-replica))))) + ;; Following tests are broken because reconfigure-cluster-workload ;; now takes an old and a new replica -; (deftest no-peers-are-allocated-missing-tags -; (is -; (= {} -; (:allocations -; (reconfigure-cluster-workload -; (one-group -; {:jobs [:j1] -; :allocations {} -; :peers [:p1 :p2 :p3] -; :peer-state {:p1 :idle :p2 :idle :p3 :idle} -; :tasks {:j1 [:t1 :t2 :t3]} -; :task-schedulers {:j1 :onyx.task-scheduler/balanced -; :j2 :onyx.task-scheduler/balanced} -; :job-scheduler :onyx.job-scheduler/balanced -; :required-tags {:j1 {:t1 [:datomic] -; :t2 [:datomic] -; :t3 [:datomic]}} -; :messaging {:onyx.messaging/impl :aeron}})))))) +(deftest no-peers-are-allocated-missing-tags + (is + (= {} + (:allocations + (reconfigure-cluster-workload + (one-group + {:jobs [:j1] + :allocations {} + :peers [:p1 :p2 :p3] + :peer-state {:p1 :idle :p2 :idle :p3 :idle} + :tasks {:j1 [:t1 :t2 :t3]} + :task-schedulers {:j1 :onyx.task-scheduler/balanced + :j2 :onyx.task-scheduler/balanced} + :job-scheduler :onyx.job-scheduler/balanced + :required-tags {:j1 {:t1 [:datomic] + :t2 [:datomic] + :t3 [:datomic]}} + :messaging {:onyx.messaging/impl :aeron}}) + old-replica))))) + -; (deftest peers-allocated-with-tags -; (is -; (= {:j1 {:t1 [:p3] -; :t2 [:p2] -; :t3 [:p1]}} -; (:allocations -; (reconfigure-cluster-workload -; (one-group -; {:jobs [:j1] -; :allocations {} -; :peers [:p1 :p2 :p3] -; :tasks {:j1 [:t1 :t2 :t3]} -; :task-schedulers {:j1 :onyx.task-scheduler/balanced -; :j2 :onyx.task-scheduler/balanced} -; :job-scheduler :onyx.job-scheduler/balanced -; :required-tags {:j1 {:t1 [:datomic] -; :t2 [:datomic] -; :t3 [:datomic]}} -; :peer-tags {:p1 [:datomic] :p2 [:datomic] :p3 [:datomic]} -; :messaging {:onyx.messaging/impl :aeron}})))))) +(deftest peers-allocated-with-tags + (is + (= {:j1 {:t1 [:p3] + :t2 [:p1] + :t3 [:p2]}} + (:allocations + (reconfigure-cluster-workload + (one-group + {:jobs [:j1] + :allocations {} + :peers [:p1 :p2 :p3] + :tasks {:j1 [:t1 :t2 :t3]} + :task-schedulers {:j1 :onyx.task-scheduler/balanced + :j2 :onyx.task-scheduler/balanced} + :job-scheduler :onyx.job-scheduler/balanced + :required-tags {:j1 {:t1 [:datomic] + :t2 [:datomic] + :t3 [:datomic]}} + :peer-tags {:p1 [:datomic] :p2 [:datomic] :p3 [:datomic]} + :messaging {:onyx.messaging/impl :aeron}}) + old-replica))))) -; (deftest only-tagged-peers-allocated -; (is -; (= {:j1 {:t1 [:p4] -; :t2 [:p2] -; :t3 [:p1]}} -; (:allocations -; (reconfigure-cluster-workload -; (one-group -; {:jobs [:j1] -; :allocations {} -; :peers [:p1 :p2 :p3 :p4] -; :tasks {:j1 [:t1 :t2 :t3]} -; :task-schedulers {:j1 :onyx.task-scheduler/balanced -; :j2 :onyx.task-scheduler/balanced} -; :job-scheduler :onyx.job-scheduler/balanced -; :required-tags {:j1 {:t1 [:datomic] -; :t2 [:datomic] -; :t3 [:datomic]}} -; :peer-tags {:p1 [:datomic] -; :p2 [:datomic] -; :p3 [] -; :p4 [:datomic]} -; :messaging {:onyx.messaging/impl :aeron}})))))) +(deftest only-tagged-peers-allocated + (is + (= {:j1 {:t1 [:p4] + :t2 [:p1] + :t3 [:p2]}} + (:allocations + (reconfigure-cluster-workload + (one-group + {:jobs [:j1] + :allocations {} + :peers [:p1 :p2 :p3 :p4] + :tasks {:j1 [:t1 :t2 :t3]} + :task-schedulers {:j1 :onyx.task-scheduler/balanced + :j2 :onyx.task-scheduler/balanced} + :job-scheduler :onyx.job-scheduler/balanced + :required-tags {:j1 {:t1 [:datomic] + :t2 [:datomic] + :t3 [:datomic]}} + :peer-tags {:p1 [:datomic] + :p2 [:datomic] + :p3 [] + :p4 [:datomic]} + :messaging {:onyx.messaging/impl :aeron}}) + old-replica))))) -; (deftest one-task-tagged -; (is -; (= {:j1 {:t1 [:p1] -; :t2 [:p3] -; :t3 [:p2]}} -; (:allocations -; (reconfigure-cluster-workload -; (one-group -; {:jobs [:j1] -; :allocations {} -; :peers [:p1 :p2 :p3] -; :tasks {:j1 [:t1 :t2 :t3]} -; :task-schedulers {:j1 :onyx.task-scheduler/balanced -; :j2 :onyx.task-scheduler/balanced} -; :job-scheduler :onyx.job-scheduler/balanced -; :required-tags {:j1 {:t1 [:datomic]}} -; :peer-tags {:p1 [:datomic]} -; :messaging {:onyx.messaging/impl :aeron}})))))) +(deftest one-task-tagged + (is + (= {:j1 {:t1 [:p1] + :t2 [:p3] + :t3 [:p2]}} + (:allocations + (reconfigure-cluster-workload + (one-group + {:jobs [:j1] + :allocations {} + :peers [:p1 :p2 :p3] + :tasks {:j1 [:t1 :t2 :t3]} + :task-schedulers {:j1 :onyx.task-scheduler/balanced + :j2 :onyx.task-scheduler/balanced} + :job-scheduler :onyx.job-scheduler/balanced + :required-tags {:j1 {:t1 [:datomic]}} + :peer-tags {:p1 [:datomic]} + :messaging {:onyx.messaging/impl :aeron}}) + old-replica))))) -; (deftest one-task-tagged-max-peers -; (is -; (= {:j1 {:t1 [:p1] -; :t2 [:p3 :p4] -; :t3 [:p2]}} -; (:allocations -; (reconfigure-cluster-workload -; (one-group -; {:jobs [:j1] -; :allocations {} -; :peers [:p1 :p2 :p3 :p4] -; :tasks {:j1 [:t1 :t2 :t3]} -; :task-schedulers {:j1 :onyx.task-scheduler/balanced -; :j2 :onyx.task-scheduler/balanced} -; :job-scheduler :onyx.job-scheduler/balanced -; :required-tags {:j1 {:t1 [:datomic]}} -; :peer-tags {:p1 [:datomic]} -; :task-saturation {:j1 {:t1 1}} -; :messaging {:onyx.messaging/impl :aeron}})))))) +(deftest one-task-tagged-max-peers + (is + (= {:j1 {:t1 [:p1] + :t2 [:p3 :p4] + :t3 [:p2]}} + (:allocations + (reconfigure-cluster-workload + (one-group + {:jobs [:j1] + :allocations {} + :peers [:p1 :p2 :p3 :p4] + :tasks {:j1 [:t1 :t2 :t3]} + :task-schedulers {:j1 :onyx.task-scheduler/balanced + :j2 :onyx.task-scheduler/balanced} + :job-scheduler :onyx.job-scheduler/balanced + :required-tags {:j1 {:t1 [:datomic]}} + :peer-tags {:p1 [:datomic]} + :task-saturation {:j1 {:t1 1}} + :messaging {:onyx.messaging/impl :aeron}}) + old-replica))))) -; (deftest two-tags -; (is -; (= {:j1 {:t1 [:p1] -; :t2 [:p2] -; :t3 [:p3]}} -; (:allocations -; (reconfigure-cluster-workload -; (one-group -; {:jobs [:j1] -; :allocations {} -; :peers [:p1 :p2 :p3] -; :tasks {:j1 [:t1 :t2 :t3]} -; :task-schedulers {:j1 :onyx.task-scheduler/balanced -; :j2 :onyx.task-scheduler/balanced} -; :job-scheduler :onyx.job-scheduler/balanced -; :required-tags {:j1 {:t1 [] -; :t2 [:mysql :datomic] -; :t3 [:datomic]}} -; :peer-tags {:p1 [] -; :p2 [:datomic :mysql] -; :p3 [:datomic]} -; :messaging {:onyx.messaging/impl :aeron}})))))) +(deftest two-tags + (is + (= {:j1 {:t1 [:p1] + :t2 [:p2] + :t3 [:p3]}} + (:allocations + (reconfigure-cluster-workload + (one-group + {:jobs [:j1] + :allocations {} + :peers [:p1 :p2 :p3] + :tasks {:j1 [:t1 :t2 :t3]} + :task-schedulers {:j1 :onyx.task-scheduler/balanced + :j2 :onyx.task-scheduler/balanced} + :job-scheduler :onyx.job-scheduler/balanced + :required-tags {:j1 {:t1 [] + :t2 [:mysql :datomic] + :t3 [:datomic]}} + :peer-tags {:p1 [] + :p2 [:datomic :mysql] + :p3 [:datomic]} + :messaging {:onyx.messaging/impl :aeron}}) + old-replica))))) -; (deftest two-jobs -; (is -; (= {:j1 {:t1 [:p7] -; :t2 [:p3] -; :t3 [:p5]} -; :j2 {:t4 [:p8] -; :t5 [:p4] -; :t6 [:p1]}} -; (:allocations -; (reconfigure-cluster-workload -; (one-group -; {:jobs [:j1 :j2] -; :allocations {:j1 {:t1 [:p7] -; :t2 [:p3 :p4 :p5] -; :t3 [:p8]}} -; :peers [:p1 :p3 :p4 :p5 :p7 :p8] -; :tasks {:j1 [:t1 :t2 :t3] -; :j2 [:t4 :t5 :t6]} -; :task-schedulers {:j1 :onyx.task-scheduler/balanced -; :j2 :onyx.task-scheduler/balanced} -; :job-scheduler :onyx.job-scheduler/balanced -; :task-saturation {:j1 {:t1 1 :t2 42 :t3 1} -; :t2 {:t4 1 :t5 42 :t6 1}} -; :required-tags {:j1 {:t1 [:datomic] -; :t2 [] -; :t3 []} -; :j2 {:t4 [:datomic] -; :t5 [] -; :t6 []}} -; :peer-tags {:p7 [:datomic] -; :p8 [:datomic]} -; :messaging {:onyx.messaging/impl :aeron}})))))) +(deftest two-jobs + (is + (= {:j1 {:t1 [:p7] + :t2 [:p3] + :t3 [:p5]} + :j2 {:t4 [:p8] + :t5 [:p4] + :t6 [:p1]}} + (:allocations + (reconfigure-cluster-workload + (one-group + {:jobs [:j1 :j2] + :allocations {:j1 {:t1 [:p7] + :t2 [:p3 :p4 :p5] + :t3 [:p8]}} + :peers [:p1 :p3 :p4 :p5 :p7 :p8] + :tasks {:j1 [:t1 :t2 :t3] + :j2 [:t4 :t5 :t6]} + :task-schedulers {:j1 :onyx.task-scheduler/balanced + :j2 :onyx.task-scheduler/balanced} + :job-scheduler :onyx.job-scheduler/balanced + :task-saturation {:j1 {:t1 1 :t2 42 :t3 1} + :t2 {:t4 1 :t5 42 :t6 1}} + :required-tags {:j1 {:t1 [:datomic] + :t2 [] + :t3 []} + :j2 {:t4 [:datomic] + :t5 [] + :t6 []}} + :peer-tags {:p7 [:datomic] + :p8 [:datomic]} + :messaging {:onyx.messaging/impl :aeron}}) + old-replica))))) ; (def onyx-id "tagged-gen-test-id") @@ -274,7 +343,7 @@ ; (is (= #{[:g2-p9 [:special-peer]] [:g2-p10 [:special-peer]]} (set (remove (comp nil? val) (:peer-tags replica))))) ; (is (= 8 (count (:peers replica)))) ; (is (= [4 4] -; (map (comp (partial apply +) vals) +; (map (comp (partial apply +) vals) ; (get-counts replica ; [{:job-id job-1-id} ; {:job-id job-2-id}])))) @@ -310,7 +379,7 @@ ; job-1-id ; peer-config ; job-1 -; (planning/discover-tasks (:catalog job-1) (:workflow job-1)))] +; (planning/discover-tasks (:catalog job-1) (:workflow job-1)))] ; (checking ; "Tagged peer leaves, deallocates the job requiring that peer" ; (times 50) @@ -332,7 +401,7 @@ ; (is (= #{} (set (remove (comp nil? val) (:peer-tags replica))))) ; (is (= 3 (count (:peers replica)))) ; (is (= [0] -; (map (comp (partial apply +) vals) +; (map (comp (partial apply +) vals) ; (get-counts replica ; [{:job-id job-1-id}]))))))) @@ -397,7 +466,7 @@ ; job-3-id ; peer-config ; job-3 -; (planning/discover-tasks (:catalog job-3) (:workflow job-3)))] +; (planning/discover-tasks (:catalog job-3) (:workflow job-3)))] ; (checking ; "More peers than necessary are tagged, job is killed, still has balanced allocation" ; (times 50) @@ -412,7 +481,7 @@ ; (log-gen/generate-join-queues (log-gen/generate-group-and-peer-ids 2 1 11 10))) ; :job-1 {:queue [job-entry-1]} ; :job-2 {:queue [job-entry-2]} -; :job-3 {:queue [job-entry-3 +; :job-3 {:queue [job-entry-3 ; {:fn :kill-job :args {:job job-3-id}}]} ; :leave-untagged {:predicate (fn [replica entry] ; (some #{:g2-p14} (:peers replica)))