From eddc4de3f5d86ede89671dc498493652d31b4c12 Mon Sep 17 00:00:00 2001 From: Daniel Gustafsson Date: Mon, 1 Feb 2021 12:44:29 +0100 Subject: [PATCH] feat(scheduler): Support multiple receivers The scheduler now also accepts the `:from` field in the events sent from the executor to either be a single `string?` or a collection of `string?` which represents a broadcast. --- src/scheduler/src/scheduler/pure.clj | 36 +++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/src/scheduler/src/scheduler/pure.clj b/src/scheduler/src/scheduler/pure.clj index 51027f97..95e04d8c 100644 --- a/src/scheduler/src/scheduler/pure.clj +++ b/src/scheduler/src/scheduler/pure.clj @@ -258,6 +258,19 @@ (def entries? (s/coll-of entry? :kind vector?)) + +(s/def :ext/to + (s/or :singleton string? + :set (s/coll-of string? :kind vector?))) + +(def ext-entry? + (s/and (s/keys :req-un [::kind + ::event + ::args + :ext/to + ::from]) + #(not (#{"timer"} (:kind %))))) + (def timer? (s/and (s/keys :req-un [::kind ::args @@ -269,8 +282,22 @@ (s/or :entry entry? :timer timer?)) +(def ext-event? + (s/or :entry ext-entry? + :timer timer?)) + (def events? (s/coll-of event? :kind vector?)) +(>defn expand-events + [events] + [(s/coll-of ext-event?) => (s/coll-of event? :kind vector?)] + (let [up (fn [event] + (case (:kind event) + "timer" [event] + (if (string? (:to event)) + [event] + (mapv #(assoc event :to %) (:to event)))))] + (vec (mapcat up events)))) (s/def ::events events?) (defn error-state? @@ -368,7 +395,8 @@ [events (-> (client/post (str url (if (= (:kind body) "timer") "timer" "event")) {:body (json/write body) :content-type "application/json; charset=utf-8"}) :body - json/read)] + json/read + (update :events expand-events))] ;; TODO(stevan): go into error state if response body is of form {"error": ...} ;; (if (:error events) true false) @@ -583,7 +611,8 @@ (-> (client/get (str executor-id "/inits")) :body json/read - :events)) + :events + expand-events)) _ (log/debug :get-initial-events executor-id events (:state data)) [data' timestamped-entries] (timestamp-entries data events (:clock data)) [data'' queue-size] (enqueue-timestamped-entries data' timestamped-entries)] @@ -639,7 +668,8 @@ :component component}) :content-type "application/json; charset=utf-8"}) :body - json/read)] + json/read + (update :events expand-events))] (assert (= (keys events) '(:events)) (str "execute!: unexpected response body: " events)) (doseq [event (:events events)]