diff --git a/test/onyx/plugin/input_test.clj b/test/onyx/plugin/input_test.clj index a947589..8a58fe7 100644 --- a/test/onyx/plugin/input_test.clj +++ b/test/onyx/plugin/input_test.clj @@ -48,6 +48,9 @@ (def workflow [[:read-messages :out]]) +(defn restartable? [e] + true) + ;; Create catalogs with different search scenarios (def catalog-base @@ -113,6 +116,17 @@ (def out-calls {:lifecycle/before-task-start inject-out-ch}) +(def batch-num (atom 0)) + +(def read-crash + {:lifecycle/before-batch + (fn [event lifecycle] + ; give the peer a bit of time to write the chunks out and ack the batches, + ; since we want to ensure that the batches aren't re-read on restart for ease of testing + (Thread/sleep 7000) + (when (= (swap! batch-num inc) 2) + (throw (ex-info "Restartable" {:restartable? true}))))}) + (def lifecycles [{:lifecycle/task :read-messages :lifecycle/calls :onyx.plugin.elasticsearch/read-messages-calls} @@ -121,23 +135,37 @@ {:lifecycle/task :out :lifecycle/calls :onyx.plugin.core-async/writer-calls}]) +(def lifecycles-fail + [{:lifecycle/task :read-messages + :lifecycle/calls :onyx.plugin.elasticsearch/read-messages-calls} + {:lifecycle/task :read-messages + :lifecycle/calls ::read-crash} + {:lifecycle/task :out + :lifecycle/calls ::out-calls} + {:lifecycle/task :out + :lifecycle/calls :onyx.plugin.core-async/writer-calls}]) + (def v-peers (onyx.api/start-peers 2 peer-group)) (defn submit-and-wait - [catalog] - (let [job-info (onyx.api/submit-job - peer-config - {:catalog catalog - :workflow workflow - :lifecycles lifecycles - :task-scheduler :onyx.task-scheduler/balanced})] - (onyx.api/await-job-completion peer-config (:job-id job-info)) - job-info)) + ([catalog] + (submit-and-wait catalog lifecycles)) + ([catalog lc] + (let [job-info (onyx.api/submit-job + peer-config + {:catalog catalog + :workflow workflow + :lifecycles lc + :task-scheduler :onyx.task-scheduler/balanced})] + (onyx.api/await-job-completion peer-config (:job-id job-info)) + job-info))) (defn run-job - [catalog] - (submit-and-wait catalog) - (take-segments! out-chan 5000)) + ([catalog] + (run-job catalog lifecycles)) + ([catalog lc] + (submit-and-wait catalog lc) + (take-segments! out-chan 5000))) (let [conn (u/connect-rest-client)] (esrd/create conn (.toString id) "_default_" {:foo "bar"}) @@ -164,6 +192,8 @@ (let [job-info-restart (submit-and-wait (update-in catalog-http-q&map&idx [0] assoc :elasticsearch/restart-on-fail true))] (def task-chunk-restart (extensions/read-chunk (:log env) :chunk (get-in job-info-restart [:task-ids :read-messages :id])))) + (def res-multi-fail (run-job (update-in catalog-http-q&map&idx [0] assoc :onyx/restart-pred-fn ::restartable?) lifecycles-fail)) + (u/delete-indexes (.toString id))) (doseq [v-peer v-peers] @@ -194,9 +224,11 @@ (is (= "bar" (-> res-native-idx first :_source :foo))))) (deftest fault-logic - (testing "Successfully processed all messages" + (testing "Successfully processed all messages no failure" (is (= 11 (count res-multi)))) (testing "Successfully wrote status to log for restart-on-fail=false" (is (= :complete (:status task-chunk-offset)))) (testing "Updates not written to log for restart-on-fail=true" - (is (= -1 (:chunk-index task-chunk-restart))))) \ No newline at end of file + (is (= -1 (:chunk-index task-chunk-restart)))) + (testing "Successfully processed all messages with failure" + (is (= 11 (count res-multi-fail))))) \ No newline at end of file