diff --git a/src/jackdaw/admin.clj b/src/jackdaw/admin.clj index fab15554..7d0ca4c1 100644 --- a/src/jackdaw/admin.clj +++ b/src/jackdaw/admin.clj @@ -158,7 +158,7 @@ {:pre [(client? client) (sequential? topics)]} (->> @(describe-topics* client (map :topic-name topics)) - (every? (fn [[topic-name {:keys [partition-info]}]] + (every? (fn [[_topic-name {:keys [partition-info]}]] (every? (fn [part-info] (and (boolean (:leader part-info)) (seq (:isr part-info)))) diff --git a/src/jackdaw/client.clj b/src/jackdaw/client.clj index 232e316b..cbcd0bd7 100644 --- a/src/jackdaw/client.clj +++ b/src/jackdaw/client.clj @@ -46,7 +46,7 @@ Callbacks are `void`, so the return value is ignored." ^Callback [on-completion] (reify Callback - (onCompletion [this record-meta exception] + (onCompletion [_this record-meta exception] (on-completion record-meta exception)))) (defn send! diff --git a/src/jackdaw/client/partitioning.clj b/src/jackdaw/client/partitioning.clj index 234af265..2b07550a 100644 --- a/src/jackdaw/client/partitioning.clj +++ b/src/jackdaw/client/partitioning.clj @@ -64,7 +64,7 @@ (defn default-partition "The kafka default partitioner. As a `::partition-fn`" - [{:keys [topic-name key-serde]} key value partitions] + [{:keys [topic-name key-serde]} key _value partitions] (let [key-bytes (.serialize (.serializer ^Serde key-serde) topic-name key)] (default-partitioner* key-bytes partitions))) @@ -92,11 +92,11 @@ (partition-fn t key value %) (->ProducerRecord producer t % key value)) (jd/->ProducerRecord t key value))) - ([^Producer producer topic partition key value] + ([^Producer _producer topic partition key value] (jd/->ProducerRecord topic (int partition) key value)) - ([^Producer producer topic partition timestamp key value] + ([^Producer _producer topic partition timestamp key value] (jd/->ProducerRecord topic partition timestamp key value)) - ([^Producer producer topic partition timestamp key value headers] + ([^Producer _producer topic partition timestamp key value headers] (jd/->ProducerRecord topic partition timestamp key value headers))) (defn produce! @@ -108,15 +108,15 @@ ([producer topic value] (jc/send! producer (->ProducerRecord producer topic value))) - ([producer topic key value] + ([producer topic _key value] (jc/send! producer (->ProducerRecord producer topic value))) - ([producer topic partition key value] + ([producer topic partition _key value] (jc/send! producer (->ProducerRecord producer topic partition topic value))) - ([producer topic partition timestamp key value] + ([producer topic partition timestamp _key value] (jc/send! producer (->ProducerRecord producer topic partition timestamp topic value))) - ([producer topic partition timestamp key value headers] + ([producer topic partition timestamp _key value headers] (jc/send! producer (->ProducerRecord producer topic partition timestamp topic value headers)))) diff --git a/src/jackdaw/data/common.clj b/src/jackdaw/data/common.clj index 1cf23934..054d1d57 100644 --- a/src/jackdaw/data/common.clj +++ b/src/jackdaw/data/common.clj @@ -44,12 +44,10 @@ (TopicPartition. topic-name (int partition))) (defn map->TopicPartition - "Given a `::topic-parititon`, build an equivalent `TopicPartition`. + "Given a `topic-partition`, build an equivalent `TopicPartition`. Inverts `(datafy ^TopicPartition tp)`." - [{:keys [topic-name - partition] - :as m}] + [{:keys [partition] :as m}] (->TopicPartition m partition)) (defn->data TopicPartition->data [^TopicPartition tp] diff --git a/src/jackdaw/data/consumer.clj b/src/jackdaw/data/consumer.clj index c6010f66..04a8c485 100644 --- a/src/jackdaw/data/consumer.clj +++ b/src/jackdaw/data/consumer.clj @@ -81,8 +81,7 @@ {:offset (.offset ots) :timestamp (.timestamp ots)}) -(defn map->OffsetAndTimestamp - [{:keys [offset timestamp] :as m}] +(defn map->OffsetAndTimestamp [m] (->OffsetAndTimestamp m)) (defn as-OffsetAndTimestamp diff --git a/src/jackdaw/data/producer.clj b/src/jackdaw/data/producer.clj index 770fd990..87c61f32 100644 --- a/src/jackdaw/data/producer.clj +++ b/src/jackdaw/data/producer.clj @@ -82,14 +82,14 @@ required. The third arity allows a user to provide a checksum. This arity may be removed in the future pending further breaking changes to the Kafka APIs." - ([{:keys [:topic-name] :as t} partition offset timestamp key-size value-size] + ([t partition offset timestamp key-size value-size] (RecordMetadata. (->TopicPartition t partition) offset 0 ;; Force absolute offset timestamp nil ;; No checksum, it's deprecated ^Integer (when key-size (int key-size)) ^Integer (when value-size (int value-size)))) - ([{:keys [:topic-name] :as t} partition base-offset relative-offset timestamp + ([t partition base-offset relative-offset timestamp key-size value-size] (RecordMetadata. (->TopicPartition t partition) base-offset @@ -98,7 +98,7 @@ nil ;; No checksum, it's depreciated ^Integer (when key-size (int key-size)) ^Integer (when value-size (int value-size)))) - ([{:keys [:topic-name] :as t} partition base-offset relative-offset timestamp checksum + ([t partition base-offset relative-offset timestamp checksum key-size value-size] (RecordMetadata. (->TopicPartition t partition) base-offset diff --git a/src/jackdaw/serdes/avro.clj b/src/jackdaw/serdes/avro.clj index 1f52026f..b53616ca 100644 --- a/src/jackdaw/serdes/avro.clj +++ b/src/jackdaw/serdes/avro.clj @@ -198,7 +198,7 @@ (try (and (number? x) (coercion-fn (bigint x))) - (catch RuntimeException e + (catch RuntimeException _e false))) (defrecord DoubleType [] @@ -260,12 +260,14 @@ (defrecord SchemalessType [] SchemaCoercion - (match-clj? [_ x] + (match-clj? [_schema-type _clj-data] true) - (match-avro? [_ x] + (match-avro? [_schema-type _avro-data] true) - (avro->clj [_ x] x) - (clj->avro [_ x path] x)) + (avro->clj [_schema-type avro-data] + avro-data) + (clj->avro [_schema-type clj-data _path] + clj-data)) ;; UUID :disapprove: @@ -278,7 +280,7 @@ (avro->clj [_ uuid-utf8] (try (UUID/fromString (str uuid-utf8)) - (catch Exception e + (catch Exception _e (str uuid-utf8)))) (clj->avro [this uuid path] (validate-clj! this uuid path "uuid") @@ -639,17 +641,14 @@ (get @coercion-cache avro-schema))))) (defn- coercion-type - [avro-schema {:keys [type-registry - coercion-cache] :as coercion-stack}] + [avro-schema coercion-stack] ((schema->coercion coercion-stack) avro-schema)) (defn as-json "Returns the json representation of the supplied `edn+avro` `edn+avro` is an avro object represented as an edn object (compatible with the jackdaw avro serde)" - [{:keys [type-registry - avro-schema - coercion-cache] :as coercion-stack} edn+avro] + [{:keys [avro-schema] :as coercion-stack} edn+avro] (let [schema (parse-schema-str avro-schema) record (clj->avro (coercion-type schema coercion-stack) edn+avro []) out-stream (ByteArrayOutputStream.) @@ -665,9 +664,7 @@ "Returns the edn representation of the supplied `json+avro` `json+avro` is an avro object represented as a json string" - [{:keys [type-registry - coercion-cache - avro-schema] :as coercion-stack} json+avro] + [{:keys [avro-schema] :as coercion-stack} json+avro] (let [schema (parse-schema-str avro-schema) decoder (.jsonDecoder ^DecoderFactory (DecoderFactory.) ^Schema schema diff --git a/src/jackdaw/streams/extras.clj b/src/jackdaw/streams/extras.clj index 1bf75ed9..7646702c 100644 --- a/src/jackdaw/streams/extras.clj +++ b/src/jackdaw/streams/extras.clj @@ -46,15 +46,15 @@ (^void onBatchRestored [_ ^TopicPartition topicPartition - ^String storeName - ^long batchEndOffset - ^long numRestored] + ^String _storeName + ^long _batchEndOffset + ^long _numRestored] (log/warnf "Restored a batch from (%s.%d)" (.topic topicPartition) (.partition topicPartition))) (^void onRestoreEnd [_ ^TopicPartition topicPartition ^String storeName - ^long totalRestored] + ^long _totalRestored] (let [start-date (get @restore-tracker storeName) elapsed-sec (.getSeconds (Duration/between start-date (Instant/now)))] diff --git a/src/jackdaw/streams/lambdas.clj b/src/jackdaw/streams/lambdas.clj index 7f2341e9..a6f4d0b6 100644 --- a/src/jackdaw/streams/lambdas.clj +++ b/src/jackdaw/streams/lambdas.clj @@ -18,7 +18,7 @@ (deftype FnAggregator [aggregator-fn] Aggregator - (apply [this agg-key value aggregate] + (apply [_this agg-key value aggregate] (aggregator-fn aggregate [agg-key value]))) (defn aggregator @@ -28,7 +28,7 @@ (deftype FnForeachAction [foreach-action-fn] ForeachAction - (apply [this key value] + (apply [_this key value] (foreach-action-fn [key value]) nil)) @@ -39,7 +39,7 @@ (deftype FnInitializer [initializer-fn] Initializer - (apply [this] + (apply [_this] (initializer-fn))) (defn initializer @@ -49,7 +49,7 @@ (deftype FnKeyValueMapper [key-value-mapper-fn] KeyValueMapper - (apply [this key value] + (apply [_this key value] (key-value (key-value-mapper-fn [key value])))) (defn key-value-mapper @@ -59,7 +59,7 @@ (deftype FnSelectKeyValueMapper [select-key-value-mapper-fn] KeyValueMapper - (apply [this key value] + (apply [_this key value] (select-key-value-mapper-fn [key value]))) (defn select-key-value-mapper @@ -70,7 +70,7 @@ (deftype FnKeyValueFlatMapper [key-value-flatmapper-fn] KeyValueMapper - (apply [this key value] + (apply [_this key value] (mapv key-value (key-value-flatmapper-fn [key value])))) (defn key-value-flatmapper @@ -83,7 +83,7 @@ (deftype FnMerger [merger-fn] Merger - (apply [this agg-key aggregate1 aggregate2] + (apply [_this agg-key aggregate1 aggregate2] (merger-fn agg-key aggregate1 aggregate2))) (defn merger @@ -93,7 +93,7 @@ (deftype FnPredicate [predicate-fn] Predicate - (test [this key value] + (test [_this key value] (boolean (predicate-fn [key value])))) (defn predicate @@ -103,7 +103,7 @@ (deftype FnReducer [reducer-fn] Reducer - (apply [this value1 value2] + (apply [_this value1 value2] (reducer-fn value1 value2))) (defn reducer @@ -113,7 +113,7 @@ (deftype FnValueJoiner [value-joiner-fn] ValueJoiner - (apply [this value1 value2] + (apply [_this value1 value2] (value-joiner-fn value1 value2))) (defn value-joiner @@ -123,7 +123,7 @@ (deftype FnValueMapper [value-mapper-fn] ValueMapper - (apply [this value] + (apply [_this value] (value-mapper-fn value))) (defn value-mapper @@ -133,7 +133,7 @@ (deftype FnStreamPartitioner [stream-partitioner-fn] StreamPartitioner - (partition [this topic-name key val partition-count] + (partition [_this topic-name key val partition-count] (stream-partitioner-fn topic-name key val partition-count))) (defn stream-partitioner @@ -157,7 +157,7 @@ (deftype FnProcessorSupplier [processor-supplier-fn] ProcessorSupplier - (get [this] + (get [_this] (processor processor-supplier-fn))) (defn processor-supplier @@ -167,7 +167,7 @@ (deftype FnTransformerSupplier [transformer-supplier-fn] TransformerSupplier - (get [this] + (get [_this] (transformer-supplier-fn))) (defn transformer-supplier @@ -177,7 +177,7 @@ (deftype FnValueTransformerSupplier [value-transformer-supplier-fn] ValueTransformerSupplier - (get [this] + (get [_this] (value-transformer-supplier-fn))) (defn value-transformer-supplier @@ -187,10 +187,10 @@ (deftype FnTransformer [context xfm-fn] Transformer - (init [this transformer-context] + (init [_this transformer-context] (reset! context transformer-context)) - (close [this]) - (transform [this k v] + (close [_this]) + (transform [_this k v] (xfm-fn @context k v))) (defn transformer-with-ctx @@ -211,10 +211,10 @@ (deftype FnValueTransformer [context xfm-fn] ValueTransformer - (init [this transformer-context] + (init [_this transformer-context] (reset! context transformer-context)) - (close [this]) - (transform [this v] + (close [_this]) + (transform [_this v] (xfm-fn @context v))) (defn value-transformer-with-ctx diff --git a/src/jackdaw/test.clj b/src/jackdaw/test.clj index d2f39965..93133d3b 100644 --- a/src/jackdaw/test.clj +++ b/src/jackdaw/test.clj @@ -76,7 +76,7 @@ consumer producer] java.io.Closeable - (close [this] + (close [_this] (doseq [hook exit-hooks] (hook)) (log/info "destroyed test machine"))) diff --git a/src/jackdaw/test/commands/base.clj b/src/jackdaw/test/commands/base.clj index 579c1185..502feed6 100644 --- a/src/jackdaw/test/commands/base.clj +++ b/src/jackdaw/test/commands/base.clj @@ -7,13 +7,13 @@ (def command-map {:stop (constantly true) - :sleep (fn [machine [sleep-ms]] + :sleep (fn [_machine [sleep-ms]] (Thread/sleep sleep-ms)) - :println (fn [machine params] + :println (fn [_machine params] (println (apply str params))) - :pprint (fn [machine params] + :pprint (fn [_machine params] (pprint/pprint params)) :do (fn [machine [do-fn]] diff --git a/src/jackdaw/test/commands/write.clj b/src/jackdaw/test/commands/write.clj index aed0eb21..6f77440a 100644 --- a/src/jackdaw/test/commands/write.clj +++ b/src/jackdaw/test/commands/write.clj @@ -5,7 +5,7 @@ (set! *warn-on-reflection* true) -(defn default-partition-fn [topic-map topic-name k v partition-count] +(defn default-partition-fn [topic-map _topic-name k _v partition-count] (int (partitioning/default-partition topic-map k nil partition-count))) (defn create-message [topic-map message opts] diff --git a/src/jackdaw/test/fixtures.clj b/src/jackdaw/test/fixtures.clj index 67c03a16..354904ad 100644 --- a/src/jackdaw/test/fixtures.clj +++ b/src/jackdaw/test/fixtures.clj @@ -30,27 +30,27 @@ (defn- create-topics "Creates " - [client kafka-config topic-config] + [client topic-config] (let [required (->> topic-config - (filter (fn [[k v]] + (filter (fn [[_k v]] (not (.contains (-> (list-topics client) .names .get) (:topic-name v))))) - (map (fn [[k v]] + (map (fn [[_k v]] (new-topic v))))] (-> (.createTopics client required) (.all)))) (defn- delete-topics - [client kafka-config topic-config] + [client topic-config] (let [deletable (->> topic-config - (filter (fn [[k v]] + (filter (fn [[_k v]] (.contains (-> (list-topics client) .names .get) (:topic-name v)))) - (map (fn [[k v]] + (map (fn [[_k v]] (:topic-name v))))] (-> (.deleteTopics client deletable) (.all)))) @@ -64,7 +64,7 @@ ([kafka-config topic-config timeout-ms] (fn [t] (with-open [client (AdminClient/create kafka-config)] - (-> (create-topics client kafka-config topic-config) + (-> (create-topics client topic-config) (.get timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS)) (log/info "topic-fixture: created topics: " (keys topic-config)) (t))))) @@ -111,7 +111,7 @@ (defn- set-error [error] (reify Thread$UncaughtExceptionHandler - (uncaughtException [_ t e] + (uncaughtException [_this _thread e] (log/error e (.getMessage e)) (reset! error e)))) @@ -218,7 +218,6 @@ (if-not (class-exists? 'kafka.tools.StreamsResetter) (throw (RuntimeException. "You must add a dependency on a kafka distrib which ships the kafka.tools.StreamsResetter tool")) (let [rt (.newInstance (clojure.lang.RT/classForName "kafka.tools.StreamsResetter")) - app-id (get app-config "application.id") args (concat ["--application-id" (get app-config "application.id") "--bootstrap-servers" (get app-config "bootstrap.servers")] reset-args) diff --git a/src/jackdaw/test/journal.clj b/src/jackdaw/test/journal.clj index d703ee84..ae1a10d7 100644 --- a/src/jackdaw/test/journal.clj +++ b/src/jackdaw/test/journal.clj @@ -23,7 +23,7 @@ (remove-watch journal id) (deliver p {:result :found :info result})))] - (add-watch journal id (fn [k r old new] + (add-watch journal id (fn [_k _r _old new] (check-condition new))) ;; don't rely on watcher to 'check-condition' ;; in case journal is already in a final, good state diff --git a/src/jackdaw/test/serde.clj b/src/jackdaw/test/serde.clj index 23933509..e2d7f9e5 100644 --- a/src/jackdaw/test/serde.clj +++ b/src/jackdaw/test/serde.clj @@ -90,7 +90,7 @@ "Returns a map of topics to the corresponding deserializer" [topic-config] (->> topic-config - (map (fn [[k v]] + (map (fn [[_k v]] [(:topic-name v) (deserializer v)])) (into {}))) @@ -99,7 +99,7 @@ "Returns a map of topic to the corresponding serializer" [topic-config] (->> topic-config - (map (fn [[k v]] + (map (fn [[_k v]] [(:topic-name v) (serializer v)])) (into {}))) diff --git a/src/jackdaw/test/transports/kafka.clj b/src/jackdaw/test/transports/kafka.clj index f4aa376a..55f9598a 100644 --- a/src/jackdaw/test/transports/kafka.clj +++ b/src/jackdaw/test/transports/kafka.clj @@ -116,7 +116,7 @@ {:process (d/loop [consumer (subscription kafka-config (vals topic-metadata))] (d/chain (d/future consumer) - (fn [c] + (fn [_c] (when-not (realized? started?) (deliver started? true) (log/infof "started kafka consumer: %s" @@ -174,7 +174,7 @@ (defn producer "Creates an asynchronous kafka producer to be used by a test-machine for for injecting test messages" - ([kafka-config topic-config serializers] + ([kafka-config _topic-config serializers] (let [producer (kafka/producer kafka-config byte-array-serde) messages (s/stream 1 (map (fn [x] (try diff --git a/src/jackdaw/test/transports/mock.clj b/src/jackdaw/test/transports/mock.clj index c7d13cea..a2440fa5 100644 --- a/src/jackdaw/test/transports/mock.clj +++ b/src/jackdaw/test/transports/mock.clj @@ -109,7 +109,7 @@ poll (poller messages topic-config)] {:process (d/loop [cont? @continue?] - (d/chain cont? (fn [d] + (d/chain cont? (fn [_d] (when-not (realized? started?) (log/info "started mock consumer: %s" {:driver driver}) (deliver started? true)) diff --git a/src/jackdaw/test/transports/rest_proxy.clj b/src/jackdaw/test/transports/rest_proxy.clj index 570b9be8..dd6716d3 100644 --- a/src/jackdaw/test/transports/rest_proxy.clj +++ b/src/jackdaw/test/transports/rest_proxy.clj @@ -164,13 +164,11 @@ (assoc client :base-uri base-uri, :instance-id instance-id)))))))) (defn with-subscription - [{:keys [base-uri group-id instance-id] :as client} topic-metadata] + [{:keys [base-uri] :as client} topic-metadata] (let [url (format "%s/subscription" base-uri) topics (map :topic-name (vals topic-metadata)) headers {"Accept" (content-types :json) - "Content-Type" (content-types :json)} - body {:topics topics}] - + "Content-Type" (content-types :json)}] (d/chain (handle-proxy-request (:post *http-client*) url headers {:topics topics}) (fn [response] (if (:error response) @@ -182,7 +180,7 @@ "Returns a function that takes a consumer and puts any messages retrieved by polling it onto the supplied `messages` channel" [consumer] - (let [{:keys [base-uri group-id instance-id]} consumer + (let [{:keys [base-uri]} consumer url (format "%s/records" base-uri) headers {"Accept" (content-types :byte-array)} body nil] @@ -237,7 +235,7 @@ (s/close! messages) (destroy-consumer client) (log/infof "stopped rest-proxy consumer: %s" (proxy-client-info client)))) - (d/chain client (fn [client] + (d/chain client (fn [_client] (s/put-all! messages msgs) (log/infof "collected %s messages from kafka" (count msgs)) (Thread/sleep 500) @@ -271,7 +269,7 @@ (defn rest-proxy-producer "Creates an asynchronous kafka producer to be used by a test-machine for for injecting test messages" - ([config topics serializers] + ([config _topics serializers] (let [producer (rest-proxy-client config) messages (s/stream 1 (map (fn [x] (try diff --git a/test/jackdaw/admin_test.clj b/test/jackdaw/admin_test.clj index eeb4ab09..ba5dba8b 100644 --- a/test/jackdaw/admin_test.clj +++ b/test/jackdaw/admin_test.clj @@ -13,9 +13,9 @@ (extend MockAdminClient admin/Client (-> admin/client-impl - (merge {:alter-topics* (fn [this topics] + (merge {:alter-topics* (fn [_this topics] (d/future [:altered topics])) - :describe-configs* (fn [this configs] + :describe-configs* (fn [_this configs] (d/future (into {} (map #(vector % {"some-key" "some-value"}) configs))))}))) @@ -45,12 +45,11 @@ (def test-cluster (take 3 (node-seq 0 "test-host"))) (defn with-mock-admin-client [cluster f] - (let [effects (atom []) - client (MockAdminClient. cluster (first cluster))] + (let [client (MockAdminClient. cluster (first cluster))] (f client))) (deftest test-new-topic - (doseq [[k info] test-topics] + (doseq [[_k info] test-topics] (let [t (data/map->NewTopic info) msg (format "cannot create topic from %s" t)] (is (instance? org.apache.kafka.clients.admin.NewTopic t) msg)))) @@ -73,7 +72,7 @@ (with-mock-admin-client test-cluster (fn [client] (admin/create-topics! client (vals test-topics)) - (doseq [[k info] test-topics] + (doseq [[_k info] test-topics] (is (admin/topic-exists? client info)))))) (deftest test-retry-exists? @@ -99,7 +98,7 @@ (with-mock-admin-client test-cluster (fn [client] (admin/create-topics! client (vals test-topics)) - (doseq [[topic-name topic-info] (admin/describe-topics client)] + (doseq [[_topic-name topic-info] (admin/describe-topics client)] (is (set= [:is-internal? :partition-info] (keys topic-info))))))) diff --git a/test/jackdaw/client_test.clj b/test/jackdaw/client_test.clj index b786c37e..9e2eee14 100644 --- a/test/jackdaw/client_test.clj +++ b/test/jackdaw/client_test.clj @@ -86,7 +86,7 @@ (testing "producer callbacks" (testing "success" (let [result (promise) - cb (client/callback (fn [meta ex] + cb (client/callback (fn [_meta ex] (if ex (deliver result ex) (deliver result :ok))))] @@ -96,7 +96,7 @@ (testing "failure" (let [result (promise) - cb (client/callback (fn [meta ex] + cb (client/callback (fn [_meta ex] (if ex (deliver result ex) (deliver result :ok)))) @@ -117,7 +117,7 @@ (testing "send with callback" (let [msg (data/->ProducerRecord {:topic-name "foo"} "1" "one") on-callback (promise) - result (client/send! producer msg (fn [meta ex] + result (client/send! producer msg (fn [_meta ex] (if ex (deliver on-callback ex) (deliver on-callback :ok))))] @@ -195,47 +195,44 @@ (deftest ^:integration partitions-for-test (fix/with-fixtures [(fix/topic-fixture (broker-config) test-topics 1000)] - (let [key-serde (:key-serde high-partition-topic) - value-serde (:value-serde high-partition-topic)] - - (testing "partition info" - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [bar-topic])) + (testing "partition info" + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [bar-topic])) (fn [consumer] (let [[pinfo] (-> (client/partitions-for consumer bar-topic) (data/datafy))] (is (response-ok? :partitions-for pinfo)))))) - (testing "single-partition consumer" - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [bar-topic])) + (testing "single-partition consumer" + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [bar-topic])) (fn [consumer] (is (= 1 (client/num-partitions consumer bar-topic)))))) - (testing "multi-partition consumer" - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [high-partition-topic])) + (testing "multi-partition consumer" + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [high-partition-topic])) (fn [consumer] (is (= 15 (client/num-partitions consumer high-partition-topic)))))) - (testing "single-partition producer" - (with-producer (client/producer (producer-config)) + (testing "single-partition producer" + (with-producer (client/producer (producer-config)) (fn [producer] (is (= 1 (client/num-partitions producer bar-topic)))))) - (testing "multi-partition producer" - (with-producer (client/producer (producer-config)) + (testing "multi-partition producer" + (with-producer (client/producer (producer-config)) (fn [producer] - (is (= 15 (client/num-partitions producer high-partition-topic))))))))) + (is (= 15 (client/num-partitions producer high-partition-topic)))))))) (defn mock-consumer "Returns a consumer that will return the supplied items (as ConsumerRecords) in response to successive calls of the `poll` method" [queue] (reify Consumer - (^ConsumerRecords poll [this ^long ms] + (^ConsumerRecords poll [_this ^long ms] (.poll queue ms TimeUnit/MILLISECONDS)) - (^ConsumerRecords poll [this ^Duration duration] + (^ConsumerRecords poll [_this ^Duration duration] (.poll queue (.toMillis duration) TimeUnit/MILLISECONDS)))) (defn poll-result [topic data] @@ -251,27 +248,24 @@ consumer (mock-consumer q)] (.put q (poll-result "test-topic" [[1 1] [2 2]])) (let [results (client/poll consumer 1000)] - (are [k v] (first results) + (are [_k _v] (first results) :topic "test-topic" :key 1 :value 1) - (are [k v] (second results) + (are [_k _v] (second results) :topic "test-topic" :key 2 :value 2)))) (deftest ^:integration position-all-test (fix/with-fixtures [(fix/topic-fixture (broker-config) test-topics 1000)] - (let [key-serde (:key-serde high-partition-topic) - value-serde (:value-serde high-partition-topic)] - - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [bar-topic])) + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [bar-topic])) (fn [consumer] ;; without an initial `poll`, there is no position info (client/poll consumer 0) (is (= {{:topic-name "bar" :partition 0} 0} - (client/position-all consumer)))))))) + (client/position-all consumer))))))) (defn with-topic-data "Helper for creating a randomly named topic and seeding it with data diff --git a/test/jackdaw/serdes/fressian_test.clj b/test/jackdaw/serdes/fressian_test.clj index d52b2489..86d2bae2 100644 --- a/test/jackdaw/serdes/fressian_test.clj +++ b/test/jackdaw/serdes/fressian_test.clj @@ -38,7 +38,7 @@ (def read-handlers (-> (merge {uri-tag (reify ReadHandler - (read [_ reader tag component-count] + (read [_ reader _tag _component-count] (URI. (.readObject reader))))} fressian/clojure-read-handlers) fressian/associative-lookup)) diff --git a/test/jackdaw/serdes/json_test.clj b/test/jackdaw/serdes/json_test.clj index 9bfa598c..c0a4e357 100644 --- a/test/jackdaw/serdes/json_test.clj +++ b/test/jackdaw/serdes/json_test.clj @@ -26,8 +26,7 @@ (deftest reverse-json-roundtrip-test (testing "JSON bytes are the same after deserialization and serialization." - (let [s (slurp (io/resource "resources/pass1.json")) - b (.serialize (jsj/serializer) nil {:foo_bar "baz"})] + (let [b (.serialize (jsj/serializer) nil {:foo_bar "baz"})] (is (= (into [] b) (into [] (->> (.deserialize (jsj/deserializer) nil b) (.serialize (jsj/serializer) nil)))))))) diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index 330a8cd6..94a95e3e 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -146,7 +146,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/filter (fn [[k v]] (> v 1))) + (k/filter (fn [[_k v]] (> v 1))) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -161,7 +161,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/filter-not (fn [[k v]] (> v 1))) + (k/filter-not (fn [[_k v]] (> v 1))) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -261,7 +261,7 @@ driver (mock/build-driver (fn [builder] (let [[pos-stream neg-stream] (-> builder (k/kstream topic-a) - (k/branch [(fn [[k v]] + (k/branch [(fn [[_k v]] (<= 0 v)) (constantly true)]))] (k/to pos-stream topic-pos) @@ -397,7 +397,7 @@ records (atom []) driver (mock/build-driver (fn [builder] (-> (k/kstream builder topic-a) - (k/process! (fn [ctx k v] + (k/process! (fn [_ctx _k v] (swap! records conj v)) [])))) publish-a (partial mock/publish driver topic-a)] @@ -411,7 +411,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/select-key (fn [[k v]] + (k/select-key (fn [[k _v]] (inc k))) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -564,7 +564,7 @@ (with-open [driver (mock/build-driver (fn [builder] (-> (k/ktable builder topic-a) - (k/filter (fn [[k v]] + (k/filter (fn [[_k v]] (not (zero? v)))) (k/to-kstream) (k/to topic-b))))] @@ -584,7 +584,7 @@ (with-open [driver (mock/build-driver (fn [builder] (-> (k/ktable builder topic-a) - (k/filter-not (fn [[k v]] + (k/filter-not (fn [[_k v]] (not (zero? v)))) (k/to-kstream) (k/to topic-b))))] @@ -911,7 +911,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/reduce + topic-a) (k/to-kstream) (k/to topic-b)))) @@ -933,7 +933,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/reduce +) (k/to-kstream) (k/to topic-b)))) @@ -955,9 +955,9 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/aggregate (constantly -10) - (fn [acc [k v]] (+ acc v)) + (fn [acc [_k v]] (+ acc v)) topic-a) (k/to-kstream) (k/to topic-b)))) @@ -1006,9 +1006,9 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/aggregate (constantly -10) - (fn [acc [k v]] (+ acc v))) + (fn [acc [_k v]] (+ acc v))) (k/to-kstream) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -1032,7 +1032,7 @@ (-> in (k/group-by-key) (k/aggregate (constantly []) - (fn [acc [k v]] + (fn [acc [_k v]] (concat [(last acc)] [v])) (assoc topic-in @@ -1059,7 +1059,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/window-by-time (TimeWindows/of 1000)) (k/reduce + topic-a) (k/to-kstream) @@ -1107,7 +1107,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/window-by-session (SessionWindows/with 1000)) (k/reduce + topic-a) (k/to-kstream) @@ -1137,10 +1137,10 @@ (k/group-by-key) (k/window-by-session (SessionWindows/with 1000)) (k/aggregate (constantly 0) - (fn [agg [k v]] + (fn [agg [_k v]] (+ agg v)) ;; Merger - (fn [k agg1 agg2] + (fn [_k agg1 agg2] (+ agg1 agg2)) topic-a) (k/to-kstream) @@ -1175,8 +1175,8 @@ [(long (/ k 10)) v]) topic-a) (k/aggregate (constantly 0) - (fn [acc [k v]] (+ acc v)) - (fn [acc [k v]] (- acc v)) + (fn [acc [_k v]] (+ acc v)) + (fn [acc [_k v]] (- acc v)) topic-b) (k/to-kstream) (k/to topic-b))))] @@ -1232,8 +1232,8 @@ [(long (/ k 10)) v]) topic-a) (k/aggregate (constantly 0) - (fn [acc [k v]] (+ acc v)) - (fn [acc [k v]] (- acc v))) + (fn [acc [_k v]] (+ acc v)) + (fn [acc [_k v]] (- acc v))) (k/to-kstream) (k/to topic-b))))] (let [publish (partial mock/publish driver topic-a)] @@ -1336,7 +1336,7 @@ k-table (k/global-ktable builder topic-b)] (-> k-stream (k/join-global k-table - (fn [[k v]] + (fn [[k _v]] k) +) (k/to topic-c)))))] @@ -1359,7 +1359,7 @@ k-table (k/global-ktable builder topic-b)] (-> k-stream (k/left-join-global k-table - (fn [[k v]] + (fn [[k _v]] k) safe-add) (k/to topic-c)))))] @@ -1392,7 +1392,7 @@ (publisher 100 {:val 10}) - (let [[[k v]] (mock/get-keyvals driver output-t)] + (let [[[_k v]] (mock/get-keyvals driver output-t)] (is (= 11 (:new-val v))) (is (= "input-topic" (:topic v))))))))) diff --git a/test/jackdaw/test/commands/write_test.clj b/test/jackdaw/test/commands/write_test.clj index d8812135..51eae7d1 100644 --- a/test/jackdaw/test/commands/write_test.clj +++ b/test/jackdaw/test/commands/write_test.clj @@ -114,7 +114,6 @@ :partition-count 5 :key-serde :long :value-serde :json}) - opts {} msg {:id 1 :a 2 :b 3 :payload "yolo"}] (testing "partition must be >= 0" diff --git a/test/jackdaw/test/fixtures_test.clj b/test/jackdaw/test/fixtures_test.clj index 9e6875ad..5825623d 100644 --- a/test/jackdaw/test/fixtures_test.clj +++ b/test/jackdaw/test/fixtures_test.clj @@ -80,7 +80,7 @@ (.write *err* "helpful error message\n") (.write *out* "essential application info\n") 1)} - (fn [{:keys [resetter reset-args error-data]}] + (fn [{:keys [resetter error-data]}] (is (instance? kafka.tools.StreamsResetter resetter)) (is (= 1 (:status error-data))) (is (= "helpful error message\n" (:err error-data))) diff --git a/test/jackdaw/test/transports/kafka_test.clj b/test/jackdaw/test/transports/kafka_test.clj index c5d5cf4c..aa100db5 100644 --- a/test/jackdaw/test/transports/kafka_test.clj +++ b/test/jackdaw/test/transports/kafka_test.clj @@ -81,7 +81,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] @@ -107,7 +106,6 @@ (let [msg {:id 2 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] diff --git a/test/jackdaw/test/transports/mock_test.clj b/test/jackdaw/test/transports/mock_test.clj index f53ddf3e..04cd7ae3 100644 --- a/test/jackdaw/test/transports/mock_test.clj +++ b/test/jackdaw/test/transports/mock_test.clj @@ -74,7 +74,7 @@ (deftest test-driver-closed-after-use (let [driver-closed? (atom false) driver (reify java.io.Closeable - (close [this] + (close [_this] (reset! driver-closed? true))) transport (trns/transport {:type :mock :driver driver @@ -90,7 +90,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] @@ -112,7 +111,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] diff --git a/test/jackdaw/test/transports/rest_proxy_test.clj b/test/jackdaw/test/transports/rest_proxy_test.clj index 5377ed47..d741132d 100644 --- a/test/jackdaw/test/transports/rest_proxy_test.clj +++ b/test/jackdaw/test/transports/rest_proxy_test.clj @@ -101,7 +101,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] @@ -127,7 +126,6 @@ (let [msg {:id 2 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] diff --git a/test/jackdaw/test_test.clj b/test/jackdaw/test_test.clj index 05db7e55..b188b145 100644 --- a/test/jackdaw/test_test.clj +++ b/test/jackdaw/test_test.clj @@ -57,7 +57,7 @@ (deftest test-run-test (testing "the run test machinery" - (let [m {:executor (-> (fn [m c] + (let [m {:executor (-> (fn [_machine c] (let [[cmd & params] c] (apply ({:min (fn [v] {:result {:result (apply min v)}}) :max (fn [v] {:result {:result (apply max v)}}) @@ -70,7 +70,7 @@ :journal (atom {})}] (testing "works properly" - (let [{:keys [results journal]} + (let [{:keys [results]} (jd.test/run-test m [[:min [1 2 3]] [:max [1 2 3]] [:is-1 1]])] @@ -88,7 +88,7 @@ (testing "execution stops on an unknown command" (is (thrown? NullPointerException - (let [{:keys [results journal]} + (let [{:keys [results]} (jd.test/run-test m [[:min [1 2 3]] [:foo 2] [:max [1 2 3]]])] @@ -216,17 +216,17 @@ [in out] (fn [builder] (let [in (-> (k/kstream builder in) - (k/map (fn [[k v]] + (k/map (fn [_record] (throw (ex-info "bad topology" {})))))] (k/to in out) builder))) (defn bad-key-fn - [msg] + [_msg] (throw (ex-info "bad-key-fn" {}))) (defn bad-watch-fn - [journal] + [_journal] (throw (ex-info "bad-watch-fn" {}))) (deftest test-machine-happy-path