diff --git a/src/onyx/state/log/bookkeeper.clj b/src/onyx/state/log/bookkeeper.clj index 962d000ef..d882cdeec 100644 --- a/src/onyx/state/log/bookkeeper.clj +++ b/src/onyx/state/log/bookkeeper.clj @@ -2,7 +2,8 @@ (:require [onyx.log.curator :as curator] [taoensso.timbre :refer [info error warn trace fatal] :as timbre] [com.stuartsierra.component :as component] - [clojure.core.async :refer [chan timeout thread go >! !! alts!! close!]] + [clojure.core.async :refer [chan timeout thread go >! !! alts!! close! poll!]] + [clojure.core.async.impl.protocols :refer [closed?]] [onyx.compression.nippy :as nippy] [onyx.extensions :as extensions] [onyx.monitoring.measurements :refer [emit-latency-value emit-latency]] @@ -63,11 +64,11 @@ (def HandleWriteCallback (reify AsyncCallback$AddCallback - (addComplete [this rc lh entry-id ack-fn] + (addComplete [this rc lh entry-id [success-fn fail-fn]] (if (= rc (BKException$Code/OK)) - (ack-fn) - ;; TODO: should restart peer, see https://github.com/onyx-platform/onyx/issues/390 - (warn "Unable to complete async write to bookkeeper. BookKeeper exception code:" rc))))) + (success-fn) + (do (warn "Unable to complete async write to bookkeeper. BookKeeper exception code:" rc) + (fail-fn)))))) (defn compaction-transition "Transitions to a new compacted ledger, plus a newly created ledger created @@ -107,56 +108,51 @@ (.asyncAddEntry compacted-ledger compacted-serialized HandleWriteCallback - (fn [] - (close-handle previous-handle) - (emit-latency-value :window-log-compaction monitoring (- (System/currentTimeMillis) start-time)) - (>!! outbox-ch - {:fn :compact-bookkeeper-log-ids - :args {:job-id job-id - :task-id task-id - :slot-id slot-id - :peer-id id - :prev-ledger-ids (vec (butlast current-ids)) - :new-ledger-ids [compacted-ledger-id]}})))))))) + (list (fn [] + (close-handle previous-handle) + (emit-latency-value :window-log-compaction monitoring (- (System/currentTimeMillis) start-time)) + (>!! outbox-ch + {:fn :compact-bookkeeper-log-ids + :args {:job-id job-id + :task-id task-id + :slot-id slot-id + :peer-id id + :prev-ledger-ids (vec (butlast current-ids)) + :new-ledger-ids [compacted-ledger-id]}})) + (fn [] + (close! (:onyx.core/restart-ch event)))))))))) -(defn ch->type [ch batch-ch timeout-ch kill-ch task-kill-ch] - (cond (= ch timeout-ch) - :timeout - (or (= ch kill-ch) (= ch task-kill-ch)) - :shutdown - :else - :read)) +(defn take-write-batch [batch-size batch-ch] + (loop [entries [] ack-fns [] i 0] + (if (< i batch-size) + (let [[entry ack-fn] (poll! batch-ch)] + (if entry + (recur (conj entries entry) + (conj ack-fns ack-fn) + (inc i)) + [entries ack-fns])) + [entries ack-fns]))) -(defn take-write-batch [peer-opts batch-ch kill-ch task-kill-ch] - (let [batch-size (arg-or-default :onyx.bookkeeper/write-batch-size peer-opts) - timeout-ms (arg-or-default :onyx.bookkeeper/write-batch-timeout peer-opts) - timeout-ch (timeout timeout-ms)] - (loop [entries [] ack-fns [] i 0] - (if (< i batch-size) - (let [[[entry ack-fn] ch] (alts!! [kill-ch task-kill-ch batch-ch timeout-ch] :priority true)] - (if entry - (recur (conj entries entry) - (conj ack-fns ack-fn) - (inc i)) - (let [msg-type (ch->type ch batch-ch timeout-ch kill-ch task-kill-ch)] - [msg-type entries ack-fns]))) - [:read entries ack-fns])))) - -(defn process-batches [{:keys [ledger-handle next-ledger-handle batch-ch] :as log} +(defn process-batches [{:keys [ledger-handle next-ledger-handle batch-ch] :as log} {:keys [onyx.core/kill-ch onyx.core/task-kill-ch onyx.core/peer-opts] :as event}] - (thread - (loop [[result batch ack-fns] (take-write-batch peer-opts batch-ch kill-ch task-kill-ch)] - ;; Safe point to transition to the next ledger handle - (when @next-ledger-handle - (compaction-transition log event)) - (when-not (empty? batch) - (.asyncAddEntry ^LedgerHandle @ledger-handle - ^bytes (nippy/window-log-compress batch) - HandleWriteCallback - (fn [] (run! (fn [f] (f)) ack-fns)))) - (if-not (= :shutdown result) - (recur (take-write-batch peer-opts batch-ch kill-ch task-kill-ch)))) - (info "BookKeeper: shutting down batch processing"))) + (thread + (let [batch-size (arg-or-default :onyx.bookkeeper/write-batch-size peer-opts) + batch-backoff (arg-or-default :onyx.bookkeeper/write-batch-backoff peer-opts)] + (loop [[batch ack-fns] (take-write-batch batch-size batch-ch)] + ;; Safe point to transition to the next ledger handle + (when @next-ledger-handle + (compaction-transition log event)) + (if (empty? batch) + (Thread/sleep batch-backoff) + (.asyncAddEntry ^LedgerHandle @ledger-handle + ^bytes (nippy/window-log-compress batch) + HandleWriteCallback + (list (fn [] (run! (fn [f] (f)) ack-fns)) + (fn [] (close! (:onyx.core/restart-ch event)))))) + (if (and (not (closed? kill-ch)) + (not (closed? task-kill-ch))) + (recur (take-write-batch batch-size batch-ch))))) + (info "BookKeeper: shutting down batch processing"))) (defn assign-bookkeeper-log-id-spin [{:keys [onyx.core/peer-opts onyx.core/job-id onyx.core/task-id @@ -170,10 +166,16 @@ :task-id task-id :slot-id slot-id :ledger-id new-ledger-id}}) - (while (and (first (alts!! [kill-ch task-kill-ch] :default true)) - (not= new-ledger-id (last (event->ledger-ids event)))) - (info "New ledger id has not been published yet. Backing off.") - (Thread/sleep (arg-or-default :onyx.bookkeeper/ledger-id-written-back-off peer-opts))))) + (loop [] + (let [exit? (nil? (first (alts!! [kill-ch task-kill-ch] :default true))) + not-added? (not= new-ledger-id (last (event->ledger-ids event)))] + (cond exit? + (info "Exiting assign-bookkeeper-log-id-spin early as peer has been reassigned.") + not-added? + (do + (info "New ledger id has not been published yet. Backing off.") + (Thread/sleep (arg-or-default :onyx.bookkeeper/ledger-id-written-back-off peer-opts)) + (recur))))))) (defmethod state-extensions/initialize-log :bookkeeper [log-type {:keys [onyx.core/peer-opts] :as event}] (let [bk-client (bookkeeper peer-opts) @@ -268,7 +270,8 @@ (defmethod state-extensions/close-log onyx.state.log.bookkeeper.BookKeeperLog [{:keys [client ledger-handle next-ledger-handle]} event] (try - (close-handle @ledger-handle) + (when @ledger-handle + (close-handle @ledger-handle)) (when @next-ledger-handle (close-handle @next-ledger-handle)) (catch Throwable t