Skip to content

Commit

Permalink
added failure tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Anderson committed Nov 30, 2015
1 parent 9226c33 commit c25b8f2
Showing 1 changed file with 46 additions and 14 deletions.
60 changes: 46 additions & 14 deletions test/onyx/plugin/input_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@

(def workflow [[:read-messages :out]])

(defn restartable? [e]
true)

;; Create catalogs with different search scenarios

(def catalog-base
Expand Down Expand Up @@ -113,6 +116,17 @@
(def out-calls
{:lifecycle/before-task-start inject-out-ch})

(def batch-num (atom 0))

(def read-crash
{:lifecycle/before-batch
(fn [event lifecycle]
; give the peer a bit of time to write the chunks out and ack the batches,
; since we want to ensure that the batches aren't re-read on restart for ease of testing
(Thread/sleep 7000)
(when (= (swap! batch-num inc) 2)
(throw (ex-info "Restartable" {:restartable? true}))))})

(def lifecycles
[{:lifecycle/task :read-messages
:lifecycle/calls :onyx.plugin.elasticsearch/read-messages-calls}
Expand All @@ -121,23 +135,37 @@
{:lifecycle/task :out
:lifecycle/calls :onyx.plugin.core-async/writer-calls}])

(def lifecycles-fail
[{:lifecycle/task :read-messages
:lifecycle/calls :onyx.plugin.elasticsearch/read-messages-calls}
{:lifecycle/task :read-messages
:lifecycle/calls ::read-crash}
{:lifecycle/task :out
:lifecycle/calls ::out-calls}
{:lifecycle/task :out
:lifecycle/calls :onyx.plugin.core-async/writer-calls}])

(def v-peers (onyx.api/start-peers 2 peer-group))

(defn submit-and-wait
[catalog]
(let [job-info (onyx.api/submit-job
peer-config
{:catalog catalog
:workflow workflow
:lifecycles lifecycles
:task-scheduler :onyx.task-scheduler/balanced})]
(onyx.api/await-job-completion peer-config (:job-id job-info))
job-info))
([catalog]
(submit-and-wait catalog lifecycles))
([catalog lc]
(let [job-info (onyx.api/submit-job
peer-config
{:catalog catalog
:workflow workflow
:lifecycles lc
:task-scheduler :onyx.task-scheduler/balanced})]
(onyx.api/await-job-completion peer-config (:job-id job-info))
job-info)))

(defn run-job
[catalog]
(submit-and-wait catalog)
(take-segments! out-chan 5000))
([catalog]
(run-job catalog lifecycles))
([catalog lc]
(submit-and-wait catalog lc)
(take-segments! out-chan 5000)))

(let [conn (u/connect-rest-client)]
(esrd/create conn (.toString id) "_default_" {:foo "bar"})
Expand All @@ -164,6 +192,8 @@
(let [job-info-restart (submit-and-wait (update-in catalog-http-q&map&idx [0] assoc :elasticsearch/restart-on-fail true))]
(def task-chunk-restart (extensions/read-chunk (:log env) :chunk (get-in job-info-restart [:task-ids :read-messages :id]))))

(def res-multi-fail (run-job (update-in catalog-http-q&map&idx [0] assoc :onyx/restart-pred-fn ::restartable?) lifecycles-fail))

(u/delete-indexes (.toString id)))

(doseq [v-peer v-peers]
Expand Down Expand Up @@ -194,9 +224,11 @@
(is (= "bar" (-> res-native-idx first :_source :foo)))))

(deftest fault-logic
(testing "Successfully processed all messages"
(testing "Successfully processed all messages no failure"
(is (= 11 (count res-multi))))
(testing "Successfully wrote status to log for restart-on-fail=false"
(is (= :complete (:status task-chunk-offset))))
(testing "Updates not written to log for restart-on-fail=true"
(is (= -1 (:chunk-index task-chunk-restart)))))
(is (= -1 (:chunk-index task-chunk-restart))))
(testing "Successfully processed all messages with failure"
(is (= 11 (count res-multi-fail)))))

0 comments on commit c25b8f2

Please sign in to comment.