Skip to content

Commit

Permalink
feat(scheduler): Support multiple receivers
Browse files Browse the repository at this point in the history
The scheduler now also accepts the `:from` field in the events sent from the
executor to either be a single `string?` or a collection of `string?` which
represents a broadcast.
  • Loading branch information
symbiont-daniel-gustafsson committed Feb 1, 2021
1 parent 00104f1 commit eddc4de
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions src/scheduler/src/scheduler/pure.clj
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,19 @@

(def entries? (s/coll-of entry? :kind vector?))


(s/def :ext/to
(s/or :singleton string?
:set (s/coll-of string? :kind vector?)))

(def ext-entry?
(s/and (s/keys :req-un [::kind
::event
::args
:ext/to
::from])
#(not (#{"timer"} (:kind %)))))

(def timer?
(s/and (s/keys :req-un [::kind
::args
Expand All @@ -269,8 +282,22 @@
(s/or :entry entry?
:timer timer?))

(def ext-event?
(s/or :entry ext-entry?
:timer timer?))

(def events? (s/coll-of event? :kind vector?))

(>defn expand-events
[events]
[(s/coll-of ext-event?) => (s/coll-of event? :kind vector?)]
(let [up (fn [event]
(case (:kind event)
"timer" [event]
(if (string? (:to event))
[event]
(mapv #(assoc event :to %) (:to event)))))]
(vec (mapcat up events))))
(s/def ::events events?)

(defn error-state?
Expand Down Expand Up @@ -368,7 +395,8 @@
[events (-> (client/post (str url (if (= (:kind body) "timer") "timer" "event"))
{:body (json/write body) :content-type "application/json; charset=utf-8"})
:body
json/read)]
json/read
(update :events expand-events))]
;; TODO(stevan): go into error state if response body is of form {"error": ...}
;; (if (:error events) true false)

Expand Down Expand Up @@ -583,7 +611,8 @@
(-> (client/get (str executor-id "/inits"))
:body
json/read
:events))
:events
expand-events))
_ (log/debug :get-initial-events executor-id events (:state data))
[data' timestamped-entries] (timestamp-entries data events (:clock data))
[data'' queue-size] (enqueue-timestamped-entries data' timestamped-entries)]
Expand Down Expand Up @@ -639,7 +668,8 @@
:component component})
:content-type "application/json; charset=utf-8"})
:body
json/read)]
json/read
(update :events expand-events))]
(assert (= (keys events) '(:events))
(str "execute!: unexpected response body: " events))
(doseq [event (:events events)]
Expand Down

0 comments on commit eddc4de

Please sign in to comment.