Skip to content

Commit

Permalink
Split out serialization and deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
jasongraffius committed Aug 5, 2017
1 parent 356d07d commit 370b7a1
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions src/kafunc/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
"A function which takes an argument of a byte array, and returns an object.
The object returned must return a similar byte array when passed to
*deserializer*. The default function introduces no dependencies, but also
makesf no guarantees of efficiency."
makes no guarantees of efficiency."
interop/io-serialize)

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Functions

(defn- update-record-kv
[record f]
[f record]
(-> record
(update :key f)
(update :value f)))
Expand Down Expand Up @@ -76,16 +76,22 @@
[consumer]
(interop/consumer-subscriptions consumer))

(defn deserialize-records
"Deserialize keys and values for all records in a collection."
[xs & [deserializer]]
(let [deserialize (fnil (or deserializer *deserializer* identity) nil)]
(map (partial update-record-kv deserialize) xs)))

(defn next-records
"Retrieves a collection of the next available records. Blocking.
See consumer->record-seq for details of what a record contains. Optionally
accepts a timeout value, which will cap the amount of milliseconds that this
function will block. By default, it should block for Long/MAX_VALUE
milliseconds."
[consumer & [timeout]]
milliseconds, which should effectively not timeout."
[consumer & {:keys [timeout deserializer]}]
(let [timeout (or timeout Long/MAX_VALUE)]
(interop/poll consumer timeout)))
(deserialize-records (interop/poll consumer timeout))))

(defn consumer->record-seq
"Create an infinite lazy seq which contains records consumed by a consumer.
Expand All @@ -102,12 +108,9 @@
value is bound during the creation of the seq, so the binding does not need
to be maintained for the lifetime of the seq."
[consumer & [deserializer]]
(let [deserialize (or deserializer *deserializer*)
extract (fnil deserialize nil)]
(lazy-cat
(->> (next-records consumer)
(update-record-kv extract))
(consumer->record-seq consumer deserializer))))
(lazy-cat
(next-records consumer :deserializer deserializer)
(consumer->record-seq consumer deserializer)))

(defn record-seq->value-seq
"Creates a lazy seq of values contained within the records in record-seq."
Expand Down Expand Up @@ -160,6 +163,12 @@
*producer-config*
config)))

(defn serialize-records

[xs & [serializer]]
(let [serialize (fnil (or serializer *serializer* identity) nil)]
(map (partial update-record-kv serialize) xs)))

(defn send-records
"Send a seq of producer-records to their destinations topic and partition.
Expand All @@ -174,11 +183,9 @@
The sending is eager, but the retreiving of metadata from the results is lazy.
This allows asynchronous sends to simply ignore this metadata."
[record-seq & [producer serializer]]
(let [producer (or producer (make-producer))
serialize (fnil (or serializer *serializer*) nil)
inject (partial (util/flipped update-record-kv) serialize)]
(let [producer (or producer (make-producer))]
(map merge
record-seq
(map (comp interop/record-meta->map deref)
(doall (map (comp (partial interop/send producer) inject)
record-seq))))))
(doall (map (partial interop/send producer)
(serialize-records record-seq)))))))

0 comments on commit 370b7a1

Please sign in to comment.