Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Commit

Permalink
Fix incorrect slot-id allocation #504
Browse files Browse the repository at this point in the history
Caused by not doing a de-allocation pass before an allocation pass.
This meant that a slot that should be taken by an allocation was not
available when the allocation occurred before the deallocation.
  • Loading branch information
lbradstreet committed Jan 26, 2016
1 parent b62f8b5 commit a66584e
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 22 deletions.
46 changes: 24 additions & 22 deletions src/onyx/scheduling/common_job_scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -241,30 +241,32 @@
(defn update-slot-id-for-peer [replica job-id task-id peer-id]
(update-in replica [:task-slot-ids job-id task-id]
(fn [slot-ids]
(let [slot-id (first (remove (set (vals slot-ids)) (range)))]
(assoc slot-ids peer-id slot-id)))))
(if (and slot-ids (slot-ids peer-id))
;; already allocated
slot-ids
(let [slot-id (first (remove (set (vals slot-ids)) (range)))]
(assoc slot-ids peer-id slot-id))))))

(defn assign-task-slot-ids [new-replica original-replica peer->task]
(defn unassign-task-slot-ids [new-replica original-replica peer->task]
(reduce-kv
(fn [result peer-id [job-id task-id]]
(if (and job-id task-id)
(let [prev-task (get-in original-replica [:allocations job-id task-id])]
(if-not (some #{peer-id} prev-task)
(if-let [prev-allocation (common/peer->allocated-job (:allocations original-replica) peer-id)]
(let [prev-job-id (:job prev-allocation)
prev-task-id (:task prev-allocation)]
(-> result
(update-in [:task-slot-ids prev-job-id prev-task-id] dissoc peer-id)
(update-slot-id-for-peer job-id task-id peer-id)))
(update-slot-id-for-peer result job-id task-id peer-id))
result))
(if-let [prev-allocation (common/peer->allocated-job (:allocations original-replica) peer-id)]
(let [prev-job-id (:job prev-allocation)
prev-task-id (:task prev-allocation)]
(update-in result [:task-slot-ids prev-job-id prev-task-id] dissoc peer-id))
result)))
new-replica
peer->task))
(fn [result peer-id [job-id task-id]]
(let [prev-allocation (common/peer->allocated-job (:allocations original-replica) peer-id)]
(if (and (or (nil? task-id)
(not (= (:task prev-allocation) task-id)))
(get (:task-slot-ids new-replica) (:job prev-allocation)))
(update-in result [:task-slot-ids (:job prev-allocation) (:task prev-allocation)] dissoc peer-id)
result)))
new-replica
peer->task))

(defn assign-task-slot-ids [new-replica original peer->task]
(reduce-kv
(fn [result peer-id [job-id task-id]]
(if (and job-id task-id)
(update-slot-id-for-peer result job-id task-id peer-id)
result))
(unassign-task-slot-ids new-replica original peer->task)
peer->task))

(defn build-current-model [replica mapping task->node peer->vm]
(doseq [j (:jobs replica)]
Expand Down
105 changes: 105 additions & 0 deletions test/onyx/log/generative_peer_join.clj
Original file line number Diff line number Diff line change
Expand Up @@ -734,3 +734,108 @@
(is (= #{:active} (set (vals (:peer-state replica)))))
(is (running? (map count (vals (get (:allocations replica) inner-job-id)))))
(is (running? (map count (vals (get (:allocations replica) outer-job-id)))))))


(def slot-id-job-id #uuid "f55c14f0-a847-42eb-81bb-0c0390a88608")

(def slot-id-job
{:workflow [[:a :b] [:b :c] [:c :d] [:d :e] [:e :f] [:f :g]]
:catalog [{:onyx/name :a
:onyx/plugin :onyx.plugin.core-async/input
:onyx/type :input
:onyx/medium :core.async
:onyx/batch-size 20
:onyx/doc "Reads segments from a core.async channel"}

{:onyx/name :b
:onyx/fn :mock/fn
:onyx/n-peers 1
:onyx/type :function
:onyx/batch-size 20}

{:onyx/name :c
:onyx/fn :mock/fn
:onyx/n-peers 1
:onyx/type :function
:onyx/batch-size 20}

{:onyx/name :d
:onyx/fn :mock/fn
:onyx/n-peers 1
:onyx/type :function
:onyx/batch-size 20}

{:onyx/name :e
:onyx/fn :mock/fn
:onyx/n-peers 1
:onyx/type :function
:onyx/batch-size 20}

{:onyx/name :f
:onyx/fn :mock/fn
:onyx/n-peers 1
:onyx/type :function
:onyx/batch-size 20}

{:onyx/name :g
:onyx/plugin :onyx.plugin.core-async/output
:onyx/type :output
:onyx/n-peers 1
:onyx/medium :core.async
:onyx/batch-size 20
:onyx/doc "Writes segments to a core.async channel"}]
:task-scheduler :onyx.task-scheduler/balanced})

(deftest slot-id-after-peer-leave
(checking
"Checking peer leave is correctly performed"
(times 50)
[{:keys [replica log peer-choices]}
(log-gen/apply-entries-gen
(gen/return
{:replica {:job-scheduler :onyx.job-scheduler/balanced
:messaging {:onyx.messaging/impl :dummy-messenger}}
:message-id 0
:entries
(-> (log-gen/generate-join-queues (log-gen/generate-peer-ids 14))
(assoc :job-1 {:queue [(api/create-submit-job-entry
slot-id-job-id
peer-config
slot-id-job
(planning/discover-tasks (:catalog slot-id-job) (:workflow slot-id-job)))]})
(assoc :leave-1 {:predicate (fn [replica entry]
(some #{:p1} (:peers replica)))
:queue [{:fn :leave-cluster :args {:id :p1}}]})
(assoc :leave-2 {:predicate (fn [replica entry]
(some #{:p2} (:peers replica)))
:queue [{:fn :leave-cluster :args {:id :p2}}]})
(assoc :leave-3 {:predicate (fn [replica entry]
(some #{:p3} (:peers replica)))
:queue [{:fn :leave-cluster :args {:id :p3}}]})
(assoc :leave-4 {:predicate (fn [replica entry]
(some #{:p4} (:peers replica)))
:queue [{:fn :leave-cluster :args {:id :p4}}]})
(assoc :leave-5 {:predicate (fn [replica entry]
(some #{:p5} (:peers replica)))
:queue [{:fn :leave-cluster :args {:id :p5}}]}))
:log []
:peer-choices []}))]

;; check that task :a has more than one peer on it so we don't get confused
(is (not
(empty?
(filter #(> (count %) 1)
(vals (val (first (:task-slot-ids replica))))))))

(is (= #{'(0)}
(set (map vals
(filter #(= (count %) 1)
(vals (val (first (:task-slot-ids replica)))))))))

(standard-invariants replica)
(is (empty? (:accepted replica)))
(is (empty? (:prepared replica)))
;; peers may have left before they joined, so there should be at LEAST 7 peers allocated
;; since there are enough peers to handle 2 peers leaving without a task being deallocated the
;; job must be able to go on
(is (>= (apply + (map count (vals (get (:allocations replica) slot-id-job-id)))) 7))))

0 comments on commit a66584e

Please sign in to comment.