diff --git a/.circleci/config.yml b/.circleci/config.yml index efdee78d..63ac6e43 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -100,6 +100,14 @@ jobs: key: v1-jackdaw-repo-{{ .Branch }}-{{ .Revision }} paths: - . + lint: + executor: machine + working_directory: /home/circleci/jackdaw + steps: + - checkout + - run: ls -la + - run: docker run --volume `pwd`:/project --rm --workdir /project cljkondo/clj-kondo sh -c 'clj-kondo --lint src test' || true + deps: <<: *build_config steps: @@ -111,6 +119,8 @@ jobs: key: *mvn_cache_key paths: - /home/circleci/.m2 + + test: <<: *test_config steps: @@ -159,6 +169,7 @@ workflows: version: 2 build_and_test: jobs: + - lint - checkout_code - deps: requires: diff --git a/.clj-kondo/config.edn b/.clj-kondo/config.edn new file mode 100644 index 00000000..93403c2c --- /dev/null +++ b/.clj-kondo/config.edn @@ -0,0 +1,10 @@ +{:linters {:unused-binding { ;; ignore unused :as binding. + :exclude-destructured-as true} + :unresolved-symbol { ;; `thrown-with-msg-and-data?` is a legit extension to the `is` macro + ;; via an `assert-expr` defmethod (see clojure.test doc) + :exclude [(clojure.test/is [thrown-with-msg-and-data?])]}} + + :lint-as {clojure.test.check.clojure-test/defspec clojure.core/def + jackdaw.data/defn->data clojure.core/defn + jackdaw.test.transports/deftransport clojure.core/defn + manifold.deferred/loop clojure.core/let}} \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index ff993e86..d474a367 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +### Unreleased + +- Add clj-kondo and fix all lint warnings and errors [#323](https://github.com/FundingCircle/jackdaw/pull/323) + ### [0.9.5] - [2022-05-26] * Move away from deprecated class ConsumerRecordFactory (to prepare migration to Kafka Streams 3.2.0) diff --git a/src/jackdaw/admin.clj b/src/jackdaw/admin.clj index fab15554..7d0ca4c1 100644 --- a/src/jackdaw/admin.clj +++ b/src/jackdaw/admin.clj @@ -158,7 +158,7 @@ {:pre [(client? client) (sequential? topics)]} (->> @(describe-topics* client (map :topic-name topics)) - (every? (fn [[topic-name {:keys [partition-info]}]] + (every? (fn [[_topic-name {:keys [partition-info]}]] (every? (fn [part-info] (and (boolean (:leader part-info)) (seq (:isr part-info)))) diff --git a/src/jackdaw/client.clj b/src/jackdaw/client.clj index bc5d08bc..cbcd0bd7 100644 --- a/src/jackdaw/client.clj +++ b/src/jackdaw/client.clj @@ -26,16 +26,16 @@ ;;;; Producer -(defn ^KafkaProducer producer +(defn producer "Return a producer with the supplied properties and optional Serdes." - ([config] + (^KafkaProducer [config] (KafkaProducer. ^java.util.Properties (jd/map->Properties config))) - ([config {:keys [^Serde key-serde ^Serde value-serde]}] + (^KafkaProducer [config {:keys [^Serde key-serde ^Serde value-serde]}] (KafkaProducer. ^java.util.Properties (jd/map->Properties config) (.serializer key-serde) (.serializer value-serde)))) -(defn ^Callback callback +(defn callback "Return a kafka `Callback` function out of a clojure `fn`. The fn must be of 2-arity, being `[record-metadata?, ex?]` where the @@ -44,9 +44,9 @@ the record. Callbacks are `void`, so the return value is ignored." - [on-completion] + ^Callback [on-completion] (reify Callback - (onCompletion [this record-meta exception] + (onCompletion [_this record-meta exception] (on-completion record-meta exception)))) (defn send! @@ -89,11 +89,11 @@ ;;;; Consumer -(defn ^KafkaConsumer consumer +(defn consumer "Return a consumer with the supplied properties and optional Serdes." - ([config] + (^KafkaConsumer [config] (KafkaConsumer. ^java.util.Properties (jd/map->Properties config))) - ([config {:keys [^Serde key-serde ^Serde value-serde] :as t}] + (^KafkaConsumer [config {:keys [^Serde key-serde ^Serde value-serde] :as t}] (when-not (or key-serde (get config "key.deserializer")) @@ -134,7 +134,7 @@ topic-configs)) consumer) -(defn ^KafkaConsumer subscribed-consumer +(defn subscribed-consumer "Given a broker configuration and topics, returns a consumer that is subscribed to all of the given topic descriptors. @@ -142,7 +142,7 @@ single pair of key and value serde instances. The serdes of the first requested topic are used, and all other topics are expected to be able to use same serdes." - [config topic-configs] + ^KafkaConsumer [config topic-configs] (when-not (sequential? topic-configs) (throw (ex-info "subscribed-consumer takes a seq of topics!" {:topic-configs topic-configs}))) diff --git a/src/jackdaw/client/partitioning.clj b/src/jackdaw/client/partitioning.clj index 63634d32..2b07550a 100644 --- a/src/jackdaw/client/partitioning.clj +++ b/src/jackdaw/client/partitioning.clj @@ -34,7 +34,8 @@ in `jackdaw.client` but backed by the partitioning machinery." {:license "BSD 3-Clause License "} - (:require [jackdaw.client :as jc] + (:require [clojure.string :as str] + [jackdaw.client :as jc] [jackdaw.data :as jd]) (:import org.apache.kafka.clients.producer.Producer org.apache.kafka.common.serialization.Serde @@ -49,9 +50,9 @@ [{:keys [record-key] :as t}] (let [record-key (as-> record-key % (-> % - (clojure.string/replace "$." "") - (clojure.string/replace "_" "-") - (clojure.string/split #"\.")) + (str/replace "$." "") + (str/replace "_" "-") + (str/split #"\.")) (mapv keyword %))] (assoc t ::key-fn #(get-in % record-key)))) @@ -63,7 +64,7 @@ (defn default-partition "The kafka default partitioner. As a `::partition-fn`" - [{:keys [topic-name key-serde]} key value partitions] + [{:keys [topic-name key-serde]} key _value partitions] (let [key-bytes (.serialize (.serializer ^Serde key-serde) topic-name key)] (default-partitioner* key-bytes partitions))) @@ -91,11 +92,11 @@ (partition-fn t key value %) (->ProducerRecord producer t % key value)) (jd/->ProducerRecord t key value))) - ([^Producer producer topic partition key value] + ([^Producer _producer topic partition key value] (jd/->ProducerRecord topic (int partition) key value)) - ([^Producer producer topic partition timestamp key value] + ([^Producer _producer topic partition timestamp key value] (jd/->ProducerRecord topic partition timestamp key value)) - ([^Producer producer topic partition timestamp key value headers] + ([^Producer _producer topic partition timestamp key value headers] (jd/->ProducerRecord topic partition timestamp key value headers))) (defn produce! @@ -107,15 +108,15 @@ ([producer topic value] (jc/send! producer (->ProducerRecord producer topic value))) - ([producer topic key value] + ([producer topic _key value] (jc/send! producer (->ProducerRecord producer topic value))) - ([producer topic partition key value] + ([producer topic partition _key value] (jc/send! producer (->ProducerRecord producer topic partition topic value))) - ([producer topic partition timestamp key value] + ([producer topic partition timestamp _key value] (jc/send! producer (->ProducerRecord producer topic partition timestamp topic value))) - ([producer topic partition timestamp key value headers] + ([producer topic partition timestamp _key value headers] (jc/send! producer (->ProducerRecord producer topic partition timestamp topic value headers)))) diff --git a/src/jackdaw/data.clj b/src/jackdaw/data.clj index cb438d4a..8b610094 100644 --- a/src/jackdaw/data.clj +++ b/src/jackdaw/data.clj @@ -34,7 +34,8 @@ (datafy [o] o))))) ;;; Just vendor this - not worth the footwork to import the "real" one - +;; Ignore clj-kondo's warning: Unresolved namespace clojure.core.protocols. Are you missing a require? +#_{:clj-kondo/ignore [:unresolved-namespace]} (defn datafy "Attempts to return x as data. diff --git a/src/jackdaw/data/admin.clj b/src/jackdaw/data/admin.clj index 2e753ddc..61ef07df 100644 --- a/src/jackdaw/data/admin.clj +++ b/src/jackdaw/data/admin.clj @@ -16,7 +16,6 @@ (ConfigEntry. k (:value v)))) (defn->data ConfigEntry->data - "" [^ConfigEntry e] {:name (.name e) :value (.value e) @@ -27,13 +26,11 @@ ;;; Config (defn map->Config - "" ^Config [m] (Config. (map (partial apply ->ConfigEntry) m))) (defn->data Config->data - "" [^Config c] (into {} (comp (map ConfigEntry->data) @@ -44,7 +41,6 @@ ;;; TopicDescription (defn->data TopicDescription->data - "" [^TopicDescription td] {:is-internal? (.isInternal td) :partition-info (map datafy (.partitions td))}) @@ -52,7 +48,6 @@ ;;; NewTopic (defn map->NewTopic - "" [{:keys [:topic-name :partition-count :replication-factor @@ -71,7 +66,6 @@ ;;;; Result types (defn->data DescribeClusterResult->data - "" [^DescribeClusterResult dcr] {:cluster-id (-> dcr .clusterId .get) :controller (-> dcr .controller .get datafy) diff --git a/src/jackdaw/data/common.clj b/src/jackdaw/data/common.clj index ec42833a..054d1d57 100644 --- a/src/jackdaw/data/common.clj +++ b/src/jackdaw/data/common.clj @@ -10,7 +10,6 @@ ;;; Node (defn->data Node->data - "" [^Node node] {:host (.host node) :port (.port node) @@ -31,7 +30,6 @@ ;;; TopicPartitionInfo (defn->data TopicPartitionInfo->data - "" [^TopicPartitionInfo tpi] {:isr (mapv datafy (.isr tpi)) :leader (datafy (.leader tpi)) @@ -40,18 +38,16 @@ ;;; Topic partition tuples -(defn ^TopicPartition ->TopicPartition +(defn ->TopicPartition "Given unrolled ctor-style arguments, create a Kafka `TopicPartition`." - [{:keys [:topic-name]} partition] + ^TopicPartition [{:keys [:topic-name]} partition] (TopicPartition. topic-name (int partition))) (defn map->TopicPartition - "Given a `::topic-parititon`, build an equivalent `TopicPartition`. + "Given a `topic-partition`, build an equivalent `TopicPartition`. Inverts `(datafy ^TopicPartition tp)`." - [{:keys [topic-name - partition] - :as m}] + [{:keys [partition] :as m}] (->TopicPartition m partition)) (defn->data TopicPartition->data [^TopicPartition tp] @@ -59,7 +55,6 @@ :partition (.partition tp)}) (defn as-TopicPartition - "" ^TopicPartition [o] (cond (instance? TopicPartition o) o diff --git a/src/jackdaw/data/common_config.clj b/src/jackdaw/data/common_config.clj index d6044f4e..e4d6bb24 100644 --- a/src/jackdaw/data/common_config.clj +++ b/src/jackdaw/data/common_config.clj @@ -8,15 +8,12 @@ ;;; ConfigResource.Type (def +broker-config-resource-type+ - "" ConfigResource$Type/BROKER) (def +topic-config-resource-type+ - "" ConfigResource$Type/TOPIC) (def +unknown-config-resource-type+ - "" ConfigResource$Type/UNKNOWN) (defn ->ConfigResourceType [o] @@ -26,7 +23,6 @@ +unknown-config-resource-type+)) (defn->data ConfigResourceType->data - "" [^ConfigResource$Type crt] (cond (= +broker-config-resource-type+ crt) :config-resource/broker @@ -40,22 +36,18 @@ ;;; ConfigResource (defn ->ConfigResource - "" [^ConfigResource$Type type ^String name] (ConfigResource. type name)) (defn ->topic-resource - "" [name] (->ConfigResource +topic-config-resource-type+ name)) (defn ->broker-resource - "" [name] (->ConfigResource +broker-config-resource-type+ name)) (defn->data ConfigResource->data - "" [^ConfigResource cr] {:name (.name cr) :type (datafy (.type cr))}) diff --git a/src/jackdaw/data/consumer.clj b/src/jackdaw/data/consumer.clj index 8a451274..04a8c485 100644 --- a/src/jackdaw/data/consumer.clj +++ b/src/jackdaw/data/consumer.clj @@ -6,15 +6,16 @@ (import '[org.apache.kafka.clients.consumer ConsumerRecord OffsetAndTimestamp] - 'org.apache.kafka.common.header.Headers) + 'org.apache.kafka.common.header.Headers + 'org.apache.kafka.common.record.TimestampType) (set! *warn-on-reflection* true) -(defn ^ConsumerRecord ->ConsumerRecord +(defn ->ConsumerRecord "Given unrolled ctor-style arguments create a Kafka `ConsumerRecord`. Convenient for testing the consumer API and its helpers." - [{:keys [:topic-name]} partition offset ts ts-type + ^ConsumerRecord [{:keys [:topic-name]} partition offset ts ts-type key-size value-size key value ^Headers headers] (ConsumerRecord. topic-name (int partition) @@ -72,16 +73,15 @@ ;;; OffsetAndTimestamp tuples -(defn ^OffsetAndTimestamp ->OffsetAndTimestamp - [{:keys [offset timestamp]}] +(defn ->OffsetAndTimestamp + ^OffsetAndTimestamp [{:keys [offset timestamp]}] (OffsetAndTimestamp. offset (long timestamp))) (defn->data OffsetAndTimestamp->data [^OffsetAndTimestamp ots] {:offset (.offset ots) :timestamp (.timestamp ots)}) -(defn map->OffsetAndTimestamp - [{:keys [offset timestamp] :as m}] +(defn map->OffsetAndTimestamp [m] (->OffsetAndTimestamp m)) (defn as-OffsetAndTimestamp diff --git a/src/jackdaw/data/producer.clj b/src/jackdaw/data/producer.clj index b79464da..87c61f32 100644 --- a/src/jackdaw/data/producer.clj +++ b/src/jackdaw/data/producer.clj @@ -13,27 +13,27 @@ ;;; Producer record -(defn ^ProducerRecord ->ProducerRecord +(defn ->ProducerRecord "Given unrolled ctor-style arguments creates a Kafka `ProducerRecord`." - ([{:keys [topic-name]} value] + (^ProducerRecord [{:keys [topic-name]} value] (ProducerRecord. ^String topic-name value)) - ([{:keys [topic-name]} key value] + (^ProducerRecord [{:keys [topic-name]} key value] (ProducerRecord. ^String topic-name key value)) - ([{:keys [topic-name]} partition key value] - (let [partition-or-nil (if partition (int partition))] + (^ProducerRecord [{:keys [topic-name]} partition key value] + (let [partition-or-nil (when partition (int partition))] (ProducerRecord. ^String topic-name ^Integer partition-or-nil key value))) - ([{:keys [topic-name]} partition timestamp key value] - (let [partition-or-nil (if partition (int partition)) - timestamp-or-nil (if timestamp (long timestamp))] + (^ProducerRecord [{:keys [topic-name]} partition timestamp key value] + (let [partition-or-nil (when partition (int partition)) + timestamp-or-nil (when timestamp (long timestamp))] (ProducerRecord. ^String topic-name ^Integer partition-or-nil ^Long timestamp-or-nil key value))) - ([{:keys [topic-name]} partition timestamp key value headers] - (let [partition-or-nil (if partition (int partition)) - timestamp-or-nil (if timestamp (long timestamp))] + (^ProducerRecord [{:keys [topic-name]} partition timestamp key value headers] + (let [partition-or-nil (when partition (int partition)) + timestamp-or-nil (when timestamp (long timestamp))] (ProducerRecord. ^String topic-name ^Integer partition-or-nil ^Long timestamp-or-nil @@ -82,31 +82,31 @@ required. The third arity allows a user to provide a checksum. This arity may be removed in the future pending further breaking changes to the Kafka APIs." - ([{:keys [:topic-name] :as t} partition offset timestamp key-size value-size] + ([t partition offset timestamp key-size value-size] (RecordMetadata. (->TopicPartition t partition) offset 0 ;; Force absolute offset timestamp nil ;; No checksum, it's deprecated - ^Integer (if key-size (int key-size)) - ^Integer (if value-size (int value-size)))) - ([{:keys [:topic-name] :as t} partition base-offset relative-offset timestamp + ^Integer (when key-size (int key-size)) + ^Integer (when value-size (int value-size)))) + ([t partition base-offset relative-offset timestamp key-size value-size] (RecordMetadata. (->TopicPartition t partition) base-offset relative-offset ;; Full offset control timestamp nil ;; No checksum, it's depreciated - ^Integer (if key-size (int key-size)) - ^Integer (if value-size (int value-size)))) - ([{:keys [:topic-name] :as t} partition base-offset relative-offset timestamp checksum + ^Integer (when key-size (int key-size)) + ^Integer (when value-size (int value-size)))) + ([t partition base-offset relative-offset timestamp checksum key-size value-size] (RecordMetadata. (->TopicPartition t partition) base-offset relative-offset ;; Full offset control timestamp checksum ;; Have fun I guess - ^Integer (if key-size (int key-size)) - ^Integer (if value-size (int value-size))))) + ^Integer (when key-size (int key-size)) + ^Integer (when value-size (int value-size))))) (defn map->RecordMetadata "Given a `::record-metdata`, build an equivalent `RecordMetadata`. diff --git a/src/jackdaw/serdes/avro.clj b/src/jackdaw/serdes/avro.clj index 53caaff8..66ae89ae 100644 --- a/src/jackdaw/serdes/avro.clj +++ b/src/jackdaw/serdes/avro.clj @@ -68,12 +68,12 @@ KafkaAvroSerializer KafkaAvroDeserializer] java.lang.CharSequence java.nio.ByteBuffer - [java.io ByteArrayOutputStream ByteArrayInputStream] + [java.io ByteArrayOutputStream] [java.util Collection Map UUID] [org.apache.avro AvroTypeException Schema$Parser Schema$ArraySchema Schema Schema$Field] [org.apache.avro.io - EncoderFactory DecoderFactory JsonEncoder] + EncoderFactory DecoderFactory] [org.apache.avro.generic GenericDatumWriter GenericDatumReader GenericContainer GenericData$Array GenericData$EnumSymbol @@ -91,10 +91,10 @@ (when schema-str (.parse (Schema$Parser.) ^String schema-str))))) -(defn- ^String mangle [^String n] +(defn- mangle ^String [^String n] (str/replace n #"-" "_")) -(defn- ^String unmangle [^String n] +(defn- unmangle ^String [^String n] (str/replace n #"_" "-")) (defn- dispatch-on-type-fields @@ -198,7 +198,7 @@ (try (and (number? x) (coercion-fn (bigint x))) - (catch RuntimeException e + (catch RuntimeException _e false))) (defrecord DoubleType [] @@ -260,12 +260,14 @@ (defrecord SchemalessType [] SchemaCoercion - (match-clj? [_ x] + (match-clj? [_schema-type _clj-data] true) - (match-avro? [_ x] + (match-avro? [_schema-type _avro-data] true) - (avro->clj [_ x] x) - (clj->avro [_ x path] x)) + (avro->clj [_schema-type avro-data] + avro-data) + (clj->avro [_schema-type clj-data _path] + clj-data)) ;; UUID :disapprove: @@ -278,7 +280,7 @@ (avro->clj [_ uuid-utf8] (try (UUID/fromString (str uuid-utf8)) - (catch Exception e + (catch Exception _e (str uuid-utf8)))) (clj->avro [this uuid path] (validate-clj! this uuid path "uuid") @@ -306,6 +308,8 @@ (clj->avro element-coercion x (conj path i))) clj-seq)))) + +#_{:clj-kondo/ignore [:redefined-var]} (defn ->ArrayType "Wrapper by which to construct a `ArrayType` which handles the structural recursion of building the handler stack so that the @@ -339,6 +343,7 @@ (mangle) (GenericData$EnumSymbol. schema)))) +#_{:clj-kondo/ignore [:redefined-var]} (defn ->EnumType [_schema->coercion ^Schema schema] (EnumType. schema)) @@ -380,6 +385,8 @@ [k (clj->avro value-coercion v (conj path k))])) clj-map))) + +#_{:clj-kondo/ignore [:redefined-var]} (defn ->MapType "Wrapper by which to construct a `MapType` which handles the structural recursion of building the handler stack so that the @@ -452,6 +459,7 @@ (throw (ex-info (str (.getMessage e)) {:path path, :clj-data clj-map} e))))))) +#_{:clj-kondo/ignore [:redefined-var]} (defn ->RecordType "Wrapper by which to construct a `RecordType` which handles the structural recursion of building the handler stack so that the @@ -490,6 +498,7 @@ {:path path, :clj-data clj-data} (AvroTypeException. "Type Error")))))) +#_{:clj-kondo/ignore [:redefined-var]} (defn ->UnionType "Wrapper by which to construct a `UnionType` which handles the structural recursion of building the handler stack so that the @@ -639,17 +648,14 @@ (get @coercion-cache avro-schema))))) (defn- coercion-type - [avro-schema {:keys [type-registry - coercion-cache] :as coercion-stack}] + [avro-schema coercion-stack] ((schema->coercion coercion-stack) avro-schema)) (defn as-json "Returns the json representation of the supplied `edn+avro` `edn+avro` is an avro object represented as an edn object (compatible with the jackdaw avro serde)" - [{:keys [type-registry - avro-schema - coercion-cache] :as coercion-stack} edn+avro] + [{:keys [avro-schema] :as coercion-stack} edn+avro] (let [schema (parse-schema-str avro-schema) record (clj->avro (coercion-type schema coercion-stack) edn+avro []) out-stream (ByteArrayOutputStream.) @@ -665,9 +671,7 @@ "Returns the edn representation of the supplied `json+avro` `json+avro` is an avro object represented as a json string" - [{:keys [type-registry - coercion-cache - avro-schema] :as coercion-stack} json+avro] + [{:keys [avro-schema] :as coercion-stack} json+avro] (let [schema (parse-schema-str avro-schema) decoder (.jsonDecoder ^DecoderFactory (DecoderFactory.) ^Schema schema diff --git a/src/jackdaw/serdes/edn.clj b/src/jackdaw/serdes/edn.clj index 03e7191c..36d6de89 100644 --- a/src/jackdaw/serdes/edn.clj +++ b/src/jackdaw/serdes/edn.clj @@ -11,7 +11,6 @@ (:require [clojure.edn] [jackdaw.serdes.fn :as jsfn]) (:import java.nio.charset.StandardCharsets - org.apache.kafka.common.serialization.Serde org.apache.kafka.common.serialization.Serdes)) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/serdes/edn2.clj b/src/jackdaw/serdes/edn2.clj index f4d12bb0..01590730 100644 --- a/src/jackdaw/serdes/edn2.clj +++ b/src/jackdaw/serdes/edn2.clj @@ -2,9 +2,7 @@ "Implements an EDN SerDes (Serializer/Deserializer)." (:require [clojure.edn] [jackdaw.serdes.fn :as jsfn]) - (:import java.nio.charset.StandardCharsets - org.apache.kafka.common.serialization.Serde - org.apache.kafka.common.serialization.Serdes) + (:import java.nio.charset.StandardCharsets) (:gen-class :implements [org.apache.kafka.common.serialization.Serde] :prefix "EdnSerde-" diff --git a/src/jackdaw/serdes/fn_impl.clj b/src/jackdaw/serdes/fn_impl.clj index 4c1ec51f..ffdb361d 100644 --- a/src/jackdaw/serdes/fn_impl.clj +++ b/src/jackdaw/serdes/fn_impl.clj @@ -2,7 +2,7 @@ "FIXME" {:license "BSD 3-Clause License "} (:import [org.apache.kafka.common.serialization - Deserializer Serdes Serializer])) + Deserializer Serializer])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/serdes/fressian.clj b/src/jackdaw/serdes/fressian.clj index bfccd243..edb1fff2 100644 --- a/src/jackdaw/serdes/fressian.clj +++ b/src/jackdaw/serdes/fressian.clj @@ -3,9 +3,7 @@ {:license "BSD 3-Clause License "} (:require [clojure.data.fressian :as fressian] [jackdaw.serdes.fn :as jsfn]) - (:import org.apache.kafka.common.serialization.Serde - org.apache.kafka.common.serialization.Serdes - [java.io ByteArrayOutputStream Closeable] + (:import java.io.ByteArrayOutputStream org.fressian.FressianWriter) (:gen-class :implements [org.apache.kafka.common.serialization.Serde] diff --git a/src/jackdaw/serdes/resolver.clj b/src/jackdaw/serdes/resolver.clj index 7cd1b6c3..29284e1b 100644 --- a/src/jackdaw/serdes/resolver.clj +++ b/src/jackdaw/serdes/resolver.clj @@ -2,8 +2,8 @@ "Helper function for creating serdes." (:require [clojure.java.io :as io] [clojure.spec.alpha :as s] - [jackdaw.serdes.avro.confluent :as c-avro] - [jackdaw.serdes.json-schema.confluent :as c-json] + [jackdaw.serdes.avro.confluent] + [jackdaw.serdes.json-schema.confluent] [jackdaw.serdes.edn] [jackdaw.serdes.json] [jackdaw.serdes] diff --git a/src/jackdaw/streams/configured.clj b/src/jackdaw/streams/configured.clj index 59b5c517..ac646437 100644 --- a/src/jackdaw/streams/configured.clj +++ b/src/jackdaw/streams/configured.clj @@ -2,8 +2,9 @@ "Clojure wrapper to kafka streams." {:license "BSD 3-Clause License "} (:refer-clojure :exclude [count map reduce group-by merge filter peek]) + #_{:clj-kondo/ignore [:refer-all]} (:require [jackdaw.streams.protocols :refer :all] - [jackdaw.streams.configurable :refer [config IConfigurable]])) + [jackdaw.streams.configurable :refer :all])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/streams/extras.clj b/src/jackdaw/streams/extras.clj index 1bf75ed9..7646702c 100644 --- a/src/jackdaw/streams/extras.clj +++ b/src/jackdaw/streams/extras.clj @@ -46,15 +46,15 @@ (^void onBatchRestored [_ ^TopicPartition topicPartition - ^String storeName - ^long batchEndOffset - ^long numRestored] + ^String _storeName + ^long _batchEndOffset + ^long _numRestored] (log/warnf "Restored a batch from (%s.%d)" (.topic topicPartition) (.partition topicPartition))) (^void onRestoreEnd [_ ^TopicPartition topicPartition ^String storeName - ^long totalRestored] + ^long _totalRestored] (let [start-date (get @restore-tracker storeName) elapsed-sec (.getSeconds (Duration/between start-date (Instant/now)))] diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index 5a59bfd2..e853087b 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -2,18 +2,15 @@ "Clojure wrapper to kafka streams." {:license "BSD 3-Clause License "} (:refer-clojure :exclude [count map reduce group-by merge filter peek]) + #_{:clj-kondo/ignore [:refer-all]} (:require [jackdaw.streams.protocols :refer :all] [jackdaw.streams.lambdas :refer :all]) (:import [java.util Collection] [java.util.regex Pattern] - [org.apache.kafka.common.serialization - Serde] [java.time Duration] - [org.apache.kafka.streams - KafkaStreams] [org.apache.kafka.streams StreamsBuilder] [org.apache.kafka.streams.kstream @@ -22,11 +19,8 @@ KeyValueMapper Materialized Merger Predicate Printed Produced Reducer SessionWindowedKStream SessionWindows Suppressed Suppressed$BufferConfig TimeWindowedKStream ValueJoiner - ValueMapper ValueMapperWithKey ValueTransformerSupplier Windows] - [org.apache.kafka.streams.processor - StreamPartitioner] - [org.apache.kafka.streams.state - KeyValueStore Stores] + ValueMapper ValueTransformerSupplier Windows ForeachAction TransformerSupplier] + [org.apache.kafka.streams.state Stores] (org.apache.kafka.streams.processor.api ProcessorSupplier))) diff --git a/src/jackdaw/streams/lambdas.clj b/src/jackdaw/streams/lambdas.clj index 7f2341e9..a6f4d0b6 100644 --- a/src/jackdaw/streams/lambdas.clj +++ b/src/jackdaw/streams/lambdas.clj @@ -18,7 +18,7 @@ (deftype FnAggregator [aggregator-fn] Aggregator - (apply [this agg-key value aggregate] + (apply [_this agg-key value aggregate] (aggregator-fn aggregate [agg-key value]))) (defn aggregator @@ -28,7 +28,7 @@ (deftype FnForeachAction [foreach-action-fn] ForeachAction - (apply [this key value] + (apply [_this key value] (foreach-action-fn [key value]) nil)) @@ -39,7 +39,7 @@ (deftype FnInitializer [initializer-fn] Initializer - (apply [this] + (apply [_this] (initializer-fn))) (defn initializer @@ -49,7 +49,7 @@ (deftype FnKeyValueMapper [key-value-mapper-fn] KeyValueMapper - (apply [this key value] + (apply [_this key value] (key-value (key-value-mapper-fn [key value])))) (defn key-value-mapper @@ -59,7 +59,7 @@ (deftype FnSelectKeyValueMapper [select-key-value-mapper-fn] KeyValueMapper - (apply [this key value] + (apply [_this key value] (select-key-value-mapper-fn [key value]))) (defn select-key-value-mapper @@ -70,7 +70,7 @@ (deftype FnKeyValueFlatMapper [key-value-flatmapper-fn] KeyValueMapper - (apply [this key value] + (apply [_this key value] (mapv key-value (key-value-flatmapper-fn [key value])))) (defn key-value-flatmapper @@ -83,7 +83,7 @@ (deftype FnMerger [merger-fn] Merger - (apply [this agg-key aggregate1 aggregate2] + (apply [_this agg-key aggregate1 aggregate2] (merger-fn agg-key aggregate1 aggregate2))) (defn merger @@ -93,7 +93,7 @@ (deftype FnPredicate [predicate-fn] Predicate - (test [this key value] + (test [_this key value] (boolean (predicate-fn [key value])))) (defn predicate @@ -103,7 +103,7 @@ (deftype FnReducer [reducer-fn] Reducer - (apply [this value1 value2] + (apply [_this value1 value2] (reducer-fn value1 value2))) (defn reducer @@ -113,7 +113,7 @@ (deftype FnValueJoiner [value-joiner-fn] ValueJoiner - (apply [this value1 value2] + (apply [_this value1 value2] (value-joiner-fn value1 value2))) (defn value-joiner @@ -123,7 +123,7 @@ (deftype FnValueMapper [value-mapper-fn] ValueMapper - (apply [this value] + (apply [_this value] (value-mapper-fn value))) (defn value-mapper @@ -133,7 +133,7 @@ (deftype FnStreamPartitioner [stream-partitioner-fn] StreamPartitioner - (partition [this topic-name key val partition-count] + (partition [_this topic-name key val partition-count] (stream-partitioner-fn topic-name key val partition-count))) (defn stream-partitioner @@ -157,7 +157,7 @@ (deftype FnProcessorSupplier [processor-supplier-fn] ProcessorSupplier - (get [this] + (get [_this] (processor processor-supplier-fn))) (defn processor-supplier @@ -167,7 +167,7 @@ (deftype FnTransformerSupplier [transformer-supplier-fn] TransformerSupplier - (get [this] + (get [_this] (transformer-supplier-fn))) (defn transformer-supplier @@ -177,7 +177,7 @@ (deftype FnValueTransformerSupplier [value-transformer-supplier-fn] ValueTransformerSupplier - (get [this] + (get [_this] (value-transformer-supplier-fn))) (defn value-transformer-supplier @@ -187,10 +187,10 @@ (deftype FnTransformer [context xfm-fn] Transformer - (init [this transformer-context] + (init [_this transformer-context] (reset! context transformer-context)) - (close [this]) - (transform [this k v] + (close [_this]) + (transform [_this k v] (xfm-fn @context k v))) (defn transformer-with-ctx @@ -211,10 +211,10 @@ (deftype FnValueTransformer [context xfm-fn] ValueTransformer - (init [this transformer-context] + (init [_this transformer-context] (reset! context transformer-context)) - (close [this]) - (transform [this v] + (close [_this]) + (transform [_this v] (xfm-fn @context v))) (defn value-transformer-with-ctx diff --git a/src/jackdaw/streams/protocols.clj b/src/jackdaw/streams/protocols.clj index 77bb06af..271f8d68 100644 --- a/src/jackdaw/streams/protocols.clj +++ b/src/jackdaw/streams/protocols.clj @@ -1,9 +1,7 @@ (ns jackdaw.streams.protocols "Kafka streams protocols." {:license "BSD 3-Clause License "} - (:refer-clojure :exclude [count map merge reduce group-by filter peek]) - (:import org.apache.kafka.streams.KafkaStreams - org.apache.kafka.streams.StreamsBuilder)) + (:refer-clojure :exclude [count map merge reduce group-by filter peek])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/streams/specs.clj b/src/jackdaw/streams/specs.clj index 593cf0b9..bb3ae095 100644 --- a/src/jackdaw/streams/specs.clj +++ b/src/jackdaw/streams/specs.clj @@ -1,5 +1,4 @@ (ns jackdaw.streams.specs - "" {:license "BSD 3-Clause License "} (:require [clojure.spec.alpha :as s] [jackdaw.specs] diff --git a/src/jackdaw/test.clj b/src/jackdaw/test.clj index 0e75b1bc..93133d3b 100644 --- a/src/jackdaw/test.clj +++ b/src/jackdaw/test.clj @@ -33,7 +33,6 @@ [jackdaw.test.journal :refer [with-journal]] [jackdaw.test.middleware :refer [with-timing with-status with-journal-snapshots]]) (:import - (java.io Closeable) (java.util Properties) (org.apache.kafka.streams Topology TopologyTestDriver StreamsBuilder))) @@ -77,7 +76,7 @@ consumer producer] java.io.Closeable - (close [this] + (close [_this] (doseq [hook exit-hooks] (hook)) (log/info "destroyed test machine"))) diff --git a/src/jackdaw/test/commands.clj b/src/jackdaw/test/commands.clj index 728249ca..8ae9c498 100644 --- a/src/jackdaw/test/commands.clj +++ b/src/jackdaw/test/commands.clj @@ -1,5 +1,4 @@ (ns jackdaw.test.commands - "" (:require [clojure.spec.alpha :as s] [jackdaw.test.commands.base :as base] diff --git a/src/jackdaw/test/commands/base.clj b/src/jackdaw/test/commands/base.clj index 579c1185..502feed6 100644 --- a/src/jackdaw/test/commands/base.clj +++ b/src/jackdaw/test/commands/base.clj @@ -7,13 +7,13 @@ (def command-map {:stop (constantly true) - :sleep (fn [machine [sleep-ms]] + :sleep (fn [_machine [sleep-ms]] (Thread/sleep sleep-ms)) - :println (fn [machine params] + :println (fn [_machine params] (println (apply str params))) - :pprint (fn [machine params] + :pprint (fn [_machine params] (pprint/pprint params)) :do (fn [machine [do-fn]] diff --git a/src/jackdaw/test/commands/watch.clj b/src/jackdaw/test/commands/watch.clj index 5b8d81ae..c780025d 100644 --- a/src/jackdaw/test/commands/watch.clj +++ b/src/jackdaw/test/commands/watch.clj @@ -1,7 +1,6 @@ (ns jackdaw.test.commands.watch (:require - [jackdaw.test.journal :as j] - [clojure.tools.logging :as log])) + [jackdaw.test.journal :as j])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/test/commands/write.clj b/src/jackdaw/test/commands/write.clj index 9a79abee..6f77440a 100644 --- a/src/jackdaw/test/commands/write.clj +++ b/src/jackdaw/test/commands/write.clj @@ -1,12 +1,11 @@ (ns jackdaw.test.commands.write (:require [manifold.stream :as s] - [clojure.tools.logging :as log] [jackdaw.client.partitioning :as partitioning])) (set! *warn-on-reflection* true) -(defn default-partition-fn [topic-map topic-name k v partition-count] +(defn default-partition-fn [topic-map _topic-name k _v partition-count] (int (partitioning/default-partition topic-map k nil partition-count))) (defn create-message [topic-map message opts] diff --git a/src/jackdaw/test/fixtures.clj b/src/jackdaw/test/fixtures.clj index 137a8065..63c73032 100644 --- a/src/jackdaw/test/fixtures.clj +++ b/src/jackdaw/test/fixtures.clj @@ -1,12 +1,10 @@ (ns jackdaw.test.fixtures - "" (:require [aleph.http :as http] [clojure.java.io :as io] [clojure.tools.logging :as log] [clojure.reflect :refer [resolve-class]] [jackdaw.streams :as k] - [jackdaw.streams.interop :refer [streams-builder]] [jackdaw.test.transports.kafka :as kt] [jackdaw.test.serde :refer [byte-array-serializer byte-array-deserializer]] [manifold.deferred :as d] @@ -32,31 +30,18 @@ (defn- create-topics "Creates " - [client kafka-config topic-config] + [client topic-config] (let [required (->> topic-config - (filter (fn [[k v]] + (filter (fn [[_k v]] (not (.contains (-> (list-topics client) .names .get) (:topic-name v))))) - (map (fn [[k v]] + (map (fn [[_k v]] (new-topic v))))] (-> (.createTopics client required) (.all)))) -(defn- delete-topics - [client kafka-config topic-config] - (let [deletable (->> topic-config - (filter (fn [[k v]] - (.contains (-> (list-topics client) - .names - .get) - (:topic-name v)))) - (map (fn [[k v]] - (:topic-name v))))] - (-> (.deleteTopics client deletable) - (.all)))) - (defn topic-fixture "Returns a fixture function that creates all the topics named in the supplied topic config before running a test function." @@ -66,7 +51,7 @@ ([kafka-config topic-config timeout-ms] (fn [t] (with-open [client (AdminClient/create kafka-config)] - (-> (create-topics client kafka-config topic-config) + (-> (create-topics client topic-config) (.get timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS)) (log/info "topic-fixture: created topics: " (keys topic-config)) (t))))) @@ -113,7 +98,7 @@ (defn- set-error [error] (reify Thread$UncaughtExceptionHandler - (uncaughtException [_ t e] + (uncaughtException [_this _thread e] (log/error e (.getMessage e)) (reset! error e)))) @@ -220,7 +205,6 @@ (if-not (class-exists? 'kafka.tools.StreamsResetter) (throw (RuntimeException. "You must add a dependency on a kafka distrib which ships the kafka.tools.StreamsResetter tool")) (let [rt (.newInstance (clojure.lang.RT/classForName "kafka.tools.StreamsResetter")) - app-id (get app-config "application.id") args (concat ["--application-id" (get app-config "application.id") "--bootstrap-servers" (get app-config "bootstrap.servers")] reset-args) diff --git a/src/jackdaw/test/journal.clj b/src/jackdaw/test/journal.clj index 459414a7..ae1a10d7 100644 --- a/src/jackdaw/test/journal.clj +++ b/src/jackdaw/test/journal.clj @@ -1,5 +1,4 @@ (ns jackdaw.test.journal - "" (:require [clojure.set :refer [subset?]] [clojure.tools.logging :as log] @@ -24,7 +23,7 @@ (remove-watch journal id) (deliver p {:result :found :info result})))] - (add-watch journal id (fn [k r old new] + (add-watch journal id (fn [_k _r _old new] (check-condition new))) ;; don't rely on watcher to 'check-condition' ;; in case journal is already in a final, good state @@ -56,9 +55,9 @@ (get m topic))) (defn journal-result - [machine record] "Journals the `record` in the appropriate place in the supplied test machine's `:journal`" + [machine record] (let [journal (:journal machine)] (if-let [err (agent-error journal)] (throw err) @@ -122,27 +121,30 @@ [journal topic-name ks value] (messages-by-kv-fn journal topic-name ks #(= value %))) -(defn by-key [topic-name ks value] +(defn by-key "Returns the first message in the topic where attribute 'ks' is equal to 'value'. Can be combined with the :watch command to assert that a message has been published: [:watch (j/by-key :result-topic [:object :color] \"red\")]" + [topic-name ks value] (fn [journal] (first (messages-by-kv journal topic-name ks value)))) -(defn by-keys [topic-name ks values] +(defn by-keys "Returns all of the messages in the topic where attribute 'ks' is equal to one of the values. Can be combined with the :watch command to assert that messages have been published: [:watch (j/by-key :result-topic [:object :color] #{\"red\" \"green\" \"blue\"})]" + [topic-name ks values] (fn [journal] (messages-by-kv-fn journal topic-name ks (set values)))) -(defn by-id [topic-name value] +(defn by-id "Returns all of the messages in the topic with an id of `value`. Can be combined with the :watch command to assert that a message with the supplied id has been published: [:watch (j/by-id :result-topic 123)]" + [topic-name value] (by-key topic-name [:id] value)) (defn all-keys-present diff --git a/src/jackdaw/test/serde.clj b/src/jackdaw/test/serde.clj index 67449a9d..e2d7f9e5 100644 --- a/src/jackdaw/test/serde.clj +++ b/src/jackdaw/test/serde.clj @@ -1,14 +1,11 @@ (ns jackdaw.test.serde (:require - [clojure.tools.logging :as log] [jackdaw.serdes.edn :as edn-serde] [jackdaw.serdes.json :as json-serde]) (:import - (org.apache.kafka.clients.consumer ConsumerRecord) - (org.apache.kafka.common.serialization Deserializer Serdes Serializer + (org.apache.kafka.common.serialization Serdes ByteArraySerializer - ByteArrayDeserializer) - (org.apache.kafka.common.errors SerializationException))) + ByteArrayDeserializer))) (set! *warn-on-reflection* false) @@ -93,7 +90,7 @@ "Returns a map of topics to the corresponding deserializer" [topic-config] (->> topic-config - (map (fn [[k v]] + (map (fn [[_k v]] [(:topic-name v) (deserializer v)])) (into {}))) @@ -102,7 +99,7 @@ "Returns a map of topic to the corresponding serializer" [topic-config] (->> topic-config - (map (fn [[k v]] + (map (fn [[_k v]] [(:topic-name v) (serializer v)])) (into {}))) diff --git a/src/jackdaw/test/transports/identity.clj b/src/jackdaw/test/transports/identity.clj index 95787877..0a923fb2 100644 --- a/src/jackdaw/test/transports/identity.clj +++ b/src/jackdaw/test/transports/identity.clj @@ -1,8 +1,7 @@ (ns jackdaw.test.transports.identity (:require - [clojure.tools.logging :as log] [manifold.stream :as s] - [jackdaw.test.transports :as t :refer [deftransport]])) + [jackdaw.test.transports :refer [deftransport]])) (set! *warn-on-reflection* true) diff --git a/src/jackdaw/test/transports/kafka.clj b/src/jackdaw/test/transports/kafka.clj index 9f506863..55f9598a 100644 --- a/src/jackdaw/test/transports/kafka.clj +++ b/src/jackdaw/test/transports/kafka.clj @@ -14,7 +14,6 @@ (:import org.apache.kafka.common.header.Header org.apache.kafka.clients.consumer.Consumer - org.apache.kafka.streams.KafkaStreams$StateListener org.apache.kafka.clients.consumer.ConsumerRecord org.apache.kafka.clients.producer.Producer org.apache.kafka.clients.producer.ProducerRecord)) @@ -85,15 +84,15 @@ (.key ^Header header) (.value ^Header header))) {} (.headers consumer-record))})) -(defn ^ProducerRecord mk-producer-record +(defn mk-producer-record "Creates a kafka ProducerRecord for use with `send!`." - ([{:keys [topic-name]} value] + (^ProducerRecord [{:keys [topic-name]} value] (ProducerRecord. ^String topic-name value)) - ([{:keys [topic-name]} key value] + (^ProducerRecord [{:keys [topic-name]} key value] (ProducerRecord. ^String topic-name key value)) - ([{:keys [topic-name]} partition key value] + (^ProducerRecord [{:keys [topic-name]} partition key value] (ProducerRecord. ^String topic-name ^Integer (int partition) key value)) - ([{:keys [topic-name]} partition timestamp key value] + (^ProducerRecord [{:keys [topic-name]} partition timestamp key value] (ProducerRecord. ^String topic-name ^Integer (int partition) ^Long timestamp key value))) (defn consumer @@ -117,7 +116,7 @@ {:process (d/loop [consumer (subscription kafka-config (vals topic-metadata))] (d/chain (d/future consumer) - (fn [c] + (fn [_c] (when-not (realized? started?) (deliver started? true) (log/infof "started kafka consumer: %s" @@ -175,7 +174,7 @@ (defn producer "Creates an asynchronous kafka producer to be used by a test-machine for for injecting test messages" - ([kafka-config topic-config serializers] + ([kafka-config _topic-config serializers] (let [producer (kafka/producer kafka-config byte-array-serde) messages (s/stream 1 (map (fn [x] (try @@ -202,7 +201,7 @@ :else (do (.close ^Producer producer) - (log/infof "stopped kafka producer: " + (log/infof "stopped kafka producer: %s" (select-keys kafka-config ["bootstrap.servers" "group.id"])))))))] {:producer producer diff --git a/src/jackdaw/test/transports/mock.clj b/src/jackdaw/test/transports/mock.clj index 955b57e4..a2440fa5 100644 --- a/src/jackdaw/test/transports/mock.clj +++ b/src/jackdaw/test/transports/mock.clj @@ -4,7 +4,7 @@ [clojure.tools.logging :as log] [jackdaw.test.journal :as j] [jackdaw.test.transports :as t :refer [deftransport]] - [jackdaw.test.serde :refer [byte-array-serializer byte-array-deserializer + [jackdaw.test.serde :refer [byte-array-deserializer apply-serializers apply-deserializers serde-map]] [manifold.stream :as s] [manifold.deferred :as d]) @@ -109,7 +109,7 @@ poll (poller messages topic-config)] {:process (d/loop [cont? @continue?] - (d/chain cont? (fn [d] + (d/chain cont? (fn [_d] (when-not (realized? started?) (log/info "started mock consumer: %s" {:driver driver}) (deliver started? true)) @@ -155,8 +155,7 @@ :offset (.offset input-record)}) (d/recur (s/take! messages))) - :else (do - (log/infof "stopped mock producer: %s" {:driver driver}))))))] + :else (log/infof "stopped mock producer: %s" {:driver driver})))))] {:messages messages :process process})) diff --git a/src/jackdaw/test/transports/rest_proxy.clj b/src/jackdaw/test/transports/rest_proxy.clj index 4cbd5894..dd6716d3 100644 --- a/src/jackdaw/test/transports/rest_proxy.clj +++ b/src/jackdaw/test/transports/rest_proxy.clj @@ -5,9 +5,10 @@ [clojure.data.json :as json] [clojure.tools.logging :as log] [clojure.stacktrace :as stacktrace] + [clojure.string :as str] [jackdaw.test.journal :as j] [jackdaw.test.transports :as t :refer [deftransport]] - [jackdaw.test.serde :refer :all] + [jackdaw.test.serde :refer [apply-deserializers apply-serializers serde-map]] [manifold.stream :as s] [manifold.deferred :as d]) (:import @@ -85,7 +86,7 @@ (json/read-str (:body %) :key-fn (comp keyword (fn [x] - (clojure.string/replace x "_" "-")))))) + (str/replace x "_" "-")))))) #(if-not (ok? (:status %)) (assoc % :error :proxy-error) %)))) @@ -163,13 +164,11 @@ (assoc client :base-uri base-uri, :instance-id instance-id)))))))) (defn with-subscription - [{:keys [base-uri group-id instance-id] :as client} topic-metadata] + [{:keys [base-uri] :as client} topic-metadata] (let [url (format "%s/subscription" base-uri) topics (map :topic-name (vals topic-metadata)) headers {"Accept" (content-types :json) - "Content-Type" (content-types :json)} - body {:topics topics}] - + "Content-Type" (content-types :json)}] (d/chain (handle-proxy-request (:post *http-client*) url headers {:topics topics}) (fn [response] (if (:error response) @@ -181,7 +180,7 @@ "Returns a function that takes a consumer and puts any messages retrieved by polling it onto the supplied `messages` channel" [consumer] - (let [{:keys [base-uri group-id instance-id]} consumer + (let [{:keys [base-uri]} consumer url (format "%s/records" base-uri) headers {"Accept" (content-types :byte-array)} body nil] @@ -236,7 +235,7 @@ (s/close! messages) (destroy-consumer client) (log/infof "stopped rest-proxy consumer: %s" (proxy-client-info client)))) - (d/chain client (fn [client] + (d/chain client (fn [_client] (s/put-all! messages msgs) (log/infof "collected %s messages from kafka" (count msgs)) (Thread/sleep 500) @@ -270,7 +269,7 @@ (defn rest-proxy-producer "Creates an asynchronous kafka producer to be used by a test-machine for for injecting test messages" - ([config topics serializers] + ([config _topics serializers] (let [producer (rest-proxy-client config) messages (s/stream 1 (map (fn [x] (try diff --git a/test/jackdaw/admin_test.clj b/test/jackdaw/admin_test.clj index 122db45e..ba5dba8b 100644 --- a/test/jackdaw/admin_test.clj +++ b/test/jackdaw/admin_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.admin-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.admin :as admin] [jackdaw.data :as data] [manifold.deferred :as d]) @@ -13,9 +13,9 @@ (extend MockAdminClient admin/Client (-> admin/client-impl - (merge {:alter-topics* (fn [this topics] + (merge {:alter-topics* (fn [_this topics] (d/future [:altered topics])) - :describe-configs* (fn [this configs] + :describe-configs* (fn [_this configs] (d/future (into {} (map #(vector % {"some-key" "some-value"}) configs))))}))) @@ -45,12 +45,11 @@ (def test-cluster (take 3 (node-seq 0 "test-host"))) (defn with-mock-admin-client [cluster f] - (let [effects (atom []) - client (MockAdminClient. cluster (first cluster))] + (let [client (MockAdminClient. cluster (first cluster))] (f client))) (deftest test-new-topic - (doseq [[k info] test-topics] + (doseq [[_k info] test-topics] (let [t (data/map->NewTopic info) msg (format "cannot create topic from %s" t)] (is (instance? org.apache.kafka.clients.admin.NewTopic t) msg)))) @@ -73,7 +72,7 @@ (with-mock-admin-client test-cluster (fn [client] (admin/create-topics! client (vals test-topics)) - (doseq [[k info] test-topics] + (doseq [[_k info] test-topics] (is (admin/topic-exists? client info)))))) (deftest test-retry-exists? @@ -99,7 +98,7 @@ (with-mock-admin-client test-cluster (fn [client] (admin/create-topics! client (vals test-topics)) - (doseq [[topic-name topic-info] (admin/describe-topics client)] + (doseq [[_topic-name topic-info] (admin/describe-topics client)] (is (set= [:is-internal? :partition-info] (keys topic-info))))))) diff --git a/test/jackdaw/client/partitioning_test.clj b/test/jackdaw/client/partitioning_test.clj index b807cb1e..166335cb 100644 --- a/test/jackdaw/client/partitioning_test.clj +++ b/test/jackdaw/client/partitioning_test.clj @@ -1,6 +1,6 @@ -(ns jackdaw.client-test +(ns jackdaw.client.partitioning-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.client :as client] [jackdaw.client.partitioning :as part])) diff --git a/test/jackdaw/client_test.clj b/test/jackdaw/client_test.clj index c000886a..9e2eee14 100644 --- a/test/jackdaw/client_test.clj +++ b/test/jackdaw/client_test.clj @@ -1,5 +1,5 @@ (ns jackdaw.client-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [are deftest is testing]] [jackdaw.admin :as admin] [jackdaw.client :as client] [jackdaw.test.fixtures :as fix] @@ -86,7 +86,7 @@ (testing "producer callbacks" (testing "success" (let [result (promise) - cb (client/callback (fn [meta ex] + cb (client/callback (fn [_meta ex] (if ex (deliver result ex) (deliver result :ok))))] @@ -96,7 +96,7 @@ (testing "failure" (let [result (promise) - cb (client/callback (fn [meta ex] + cb (client/callback (fn [_meta ex] (if ex (deliver result ex) (deliver result :ok)))) @@ -117,7 +117,7 @@ (testing "send with callback" (let [msg (data/->ProducerRecord {:topic-name "foo"} "1" "one") on-callback (promise) - result (client/send! producer msg (fn [meta ex] + result (client/send! producer msg (fn [_meta ex] (if ex (deliver on-callback ex) (deliver on-callback :ok))))] @@ -195,47 +195,44 @@ (deftest ^:integration partitions-for-test (fix/with-fixtures [(fix/topic-fixture (broker-config) test-topics 1000)] - (let [key-serde (:key-serde high-partition-topic) - value-serde (:value-serde high-partition-topic)] - - (testing "partition info" - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [bar-topic])) + (testing "partition info" + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [bar-topic])) (fn [consumer] (let [[pinfo] (-> (client/partitions-for consumer bar-topic) (data/datafy))] (is (response-ok? :partitions-for pinfo)))))) - (testing "single-partition consumer" - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [bar-topic])) + (testing "single-partition consumer" + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [bar-topic])) (fn [consumer] (is (= 1 (client/num-partitions consumer bar-topic)))))) - (testing "multi-partition consumer" - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [high-partition-topic])) + (testing "multi-partition consumer" + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [high-partition-topic])) (fn [consumer] (is (= 15 (client/num-partitions consumer high-partition-topic)))))) - (testing "single-partition producer" - (with-producer (client/producer (producer-config)) + (testing "single-partition producer" + (with-producer (client/producer (producer-config)) (fn [producer] (is (= 1 (client/num-partitions producer bar-topic)))))) - (testing "multi-partition producer" - (with-producer (client/producer (producer-config)) + (testing "multi-partition producer" + (with-producer (client/producer (producer-config)) (fn [producer] - (is (= 15 (client/num-partitions producer high-partition-topic))))))))) + (is (= 15 (client/num-partitions producer high-partition-topic)))))))) (defn mock-consumer "Returns a consumer that will return the supplied items (as ConsumerRecords) in response to successive calls of the `poll` method" [queue] (reify Consumer - (^ConsumerRecords poll [this ^long ms] + (^ConsumerRecords poll [_this ^long ms] (.poll queue ms TimeUnit/MILLISECONDS)) - (^ConsumerRecords poll [this ^Duration duration] + (^ConsumerRecords poll [_this ^Duration duration] (.poll queue (.toMillis duration) TimeUnit/MILLISECONDS)))) (defn poll-result [topic data] @@ -251,27 +248,24 @@ consumer (mock-consumer q)] (.put q (poll-result "test-topic" [[1 1] [2 2]])) (let [results (client/poll consumer 1000)] - (are [k v] (first results) + (are [_k _v] (first results) :topic "test-topic" :key 1 :value 1) - (are [k v] (second results) + (are [_k _v] (second results) :topic "test-topic" :key 2 :value 2)))) (deftest ^:integration position-all-test (fix/with-fixtures [(fix/topic-fixture (broker-config) test-topics 1000)] - (let [key-serde (:key-serde high-partition-topic) - value-serde (:value-serde high-partition-topic)] - - (with-consumer (-> (client/consumer (consumer-config "partition-test")) - (client/subscribe [bar-topic])) + (with-consumer (-> (client/consumer (consumer-config "partition-test")) + (client/subscribe [bar-topic])) (fn [consumer] ;; without an initial `poll`, there is no position info (client/poll consumer 0) (is (= {{:topic-name "bar" :partition 0} 0} - (client/position-all consumer)))))))) + (client/position-all consumer))))))) (defn with-topic-data "Helper for creating a randomly named topic and seeding it with data diff --git a/test/jackdaw/data_test.clj b/test/jackdaw/data_test.clj index a9553a41..ae017567 100644 --- a/test/jackdaw/data_test.clj +++ b/test/jackdaw/data_test.clj @@ -1,10 +1,7 @@ (ns jackdaw.data-test - (:require [clojure.test :refer :all] - [jackdaw.test.fixtures :as fix] - [jackdaw.test.serde :as serde] + (:require [clojure.test :refer [are deftest]] [jackdaw.data :as data]) - (:import [org.apache.kafka.clients.producer - ProducerRecord RecordMetadata] + (:import org.apache.kafka.clients.producer.ProducerRecord [org.apache.kafka.common.header Headers Header])) diff --git a/test/jackdaw/serdes/avro/integration_test.clj b/test/jackdaw/serdes/avro/integration_test.clj index 2eeb591d..1349777f 100644 --- a/test/jackdaw/serdes/avro/integration_test.clj +++ b/test/jackdaw/serdes/avro/integration_test.clj @@ -9,9 +9,7 @@ [jackdaw.serdes.avro :as avro] [jackdaw.serdes.avro.schema-registry :as reg] [jackdaw.test.fixtures :as fix]) - (:import [org.apache.avro Schema$Parser] - [org.apache.avro.generic GenericData$Record] - [org.apache.kafka.common.serialization Serde Serdes])) + (:import [org.apache.kafka.common.serialization Serde Serdes])) (set! *warn-on-reflection* false) @@ -37,29 +35,29 @@ (deftest mock-schema-registry (testing "schema can be serialized by registry client" - (let [serde ^Serde (avro/serde +type-registry+ +mock-schema-registry+ +topic-config+)] - (let [msg {:customer-id (uuid/v4) - :address {:value "foo" - :key-path "foo.bar.baz"}}] - (let [serialized (-> (.serializer serde) - (.serialize "foo" msg)) - deserialized (-> (.deserializer serde) - (.deserialize "foo" serialized))] - (is (= deserialized msg))))))) + (let [serde ^Serde (avro/serde +type-registry+ +mock-schema-registry+ +topic-config+) + msg {:customer-id (uuid/v4) + :address {:value "foo" + :key-path "foo.bar.baz"}} + serialized (-> (.serializer serde) + (.serialize "foo" msg)) + deserialized (-> (.deserializer serde) + (.deserialize "foo" serialized))] + (is (= deserialized msg))))) (deftest ^:integration real-schema-registry (fix/with-fixtures [(fix/service-ready? {:http-url +real-schema-registry-url+ :http-timeout 5000})] (testing "schema registry set in config" - (let [serde ^Serde (avro/serde +type-registry+ +real-schema-registry+ +topic-config+)] - (let [msg {:customer-id (uuid/v4) - :address {:value "foo" - :key-path "foo.bar.baz"}}] - (let [serialized (-> (.serializer serde) - (.serialize "foo" msg)) - deserialized (-> (.deserializer serde) - (.deserialize "foo" serialized))] - (is (= deserialized msg)))))))) + (let [serde ^Serde (avro/serde +type-registry+ +real-schema-registry+ +topic-config+) + msg {:customer-id (uuid/v4) + :address {:value "foo" + :key-path "foo.bar.baz"}} + serialized (-> (.serializer serde) + (.serialize "foo" msg)) + deserialized (-> (.deserializer serde) + (.deserialize "foo" serialized))] + (is (= deserialized msg)))))) ;;;; Client integration tests against real Kafka through a real topic diff --git a/test/jackdaw/serdes/avro_test.clj b/test/jackdaw/serdes/avro_test.clj index 154caaf0..7f4d4a2b 100644 --- a/test/jackdaw/serdes/avro_test.clj +++ b/test/jackdaw/serdes/avro_test.clj @@ -3,7 +3,6 @@ [clj-uuid :as uuid] [clojure.data :refer [diff]] [clojure.data.json :as json] - [clojure.pprint :refer [pprint]] [jackdaw.serdes.avro :as avro] [jackdaw.serdes.avro.schema-registry :as reg]) (:import [java.nio ByteBuffer] diff --git a/test/jackdaw/serdes/edn2_test.clj b/test/jackdaw/serdes/edn2_test.clj index 7e9685f7..13d92a74 100644 --- a/test/jackdaw/serdes/edn2_test.clj +++ b/test/jackdaw/serdes/edn2_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.serdes.edn2-test (:require [clojure.spec.alpha :as s] - [clojure.test :refer :all] + [clojure.test :refer [is testing]] [clojure.test.check.clojure-test :refer [defspec]] [clojure.test.check.generators :as gen] [clojure.test.check.properties :as prop] diff --git a/test/jackdaw/serdes/edn_test.clj b/test/jackdaw/serdes/edn_test.clj index 9f2ce1af..a6eb70be 100644 --- a/test/jackdaw/serdes/edn_test.clj +++ b/test/jackdaw/serdes/edn_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.serdes.edn-test (:require [clojure.spec.alpha :as s] - [clojure.test :refer :all] + [clojure.test :refer [is testing]] [clojure.test.check.clojure-test :refer [defspec]] [clojure.test.check.generators :as gen] [clojure.test.check.properties :as prop] diff --git a/test/jackdaw/serdes/fressian_test.clj b/test/jackdaw/serdes/fressian_test.clj index 2045c719..86d2bae2 100644 --- a/test/jackdaw/serdes/fressian_test.clj +++ b/test/jackdaw/serdes/fressian_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.serdes.fressian-test (:require [clojure.spec.alpha :as s] - [clojure.test :refer :all] + [clojure.test :refer [is testing]] [clojure.test.check.clojure-test :refer [defspec]] [clojure.test.check.generators :as gen] [clojure.test.check.properties :as prop] @@ -38,7 +38,7 @@ (def read-handlers (-> (merge {uri-tag (reify ReadHandler - (read [_ reader tag component-count] + (read [_ reader _tag _component-count] (URI. (.readObject reader))))} fressian/clojure-read-handlers) fressian/associative-lookup)) diff --git a/test/jackdaw/serdes/json_test.clj b/test/jackdaw/serdes/json_test.clj index 60cec73d..c0a4e357 100644 --- a/test/jackdaw/serdes/json_test.clj +++ b/test/jackdaw/serdes/json_test.clj @@ -1,7 +1,7 @@ (ns jackdaw.serdes.json-test (:require [clojure.data.json :as json] [clojure.java.io :as io] - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [clojure.test.check.clojure-test :as ct :refer [defspec]] [clojure.test.check.generators :as gen] [clojure.test.check.properties :as prop] @@ -26,8 +26,7 @@ (deftest reverse-json-roundtrip-test (testing "JSON bytes are the same after deserialization and serialization." - (let [s (slurp (io/resource "resources/pass1.json")) - b (.serialize (jsj/serializer) nil {:foo_bar "baz"})] + (let [b (.serialize (jsj/serializer) nil {:foo_bar "baz"})] (is (= (into [] b) (into [] (->> (.deserialize (jsj/deserializer) nil b) (.serialize (jsj/serializer) nil)))))))) diff --git a/test/jackdaw/specs_test.clj b/test/jackdaw/specs_test.clj index 22ffd2a8..2618a86e 100644 --- a/test/jackdaw/specs_test.clj +++ b/test/jackdaw/specs_test.clj @@ -1,5 +1,5 @@ (ns jackdaw.specs-test - (:require [jackdaw.specs :refer :all] + (:require [jackdaw.specs :refer [exactly-one-true?]] [clojure.test :refer [deftest are]])) (deftest exactly-one-true?-test diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index ae8c4e0b..94a95e3e 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -1,10 +1,9 @@ (ns jackdaw.streams-test "Tests of the kafka streams wrapper." (:require [clojure.spec.test.alpha :as stest] - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.serdes.edn :as jse] [jackdaw.streams :as k] - [jackdaw.streams.configurable :as cfg] [jackdaw.streams.interop :as interop] [jackdaw.streams.lambdas :as lambdas :refer [key-value]] [jackdaw.streams.lambdas.specs] @@ -147,7 +146,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/filter (fn [[k v]] (> v 1))) + (k/filter (fn [[_k v]] (> v 1))) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -162,7 +161,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/filter-not (fn [[k v]] (> v 1))) + (k/filter-not (fn [[_k v]] (> v 1))) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -262,7 +261,7 @@ driver (mock/build-driver (fn [builder] (let [[pos-stream neg-stream] (-> builder (k/kstream topic-a) - (k/branch [(fn [[k v]] + (k/branch [(fn [[_k v]] (<= 0 v)) (constantly true)]))] (k/to pos-stream topic-pos) @@ -398,7 +397,7 @@ records (atom []) driver (mock/build-driver (fn [builder] (-> (k/kstream builder topic-a) - (k/process! (fn [ctx k v] + (k/process! (fn [_ctx _k v] (swap! records conj v)) [])))) publish-a (partial mock/publish driver topic-a)] @@ -412,7 +411,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/select-key (fn [[k v]] + (k/select-key (fn [[k _v]] (inc k))) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -565,7 +564,7 @@ (with-open [driver (mock/build-driver (fn [builder] (-> (k/ktable builder topic-a) - (k/filter (fn [[k v]] + (k/filter (fn [[_k v]] (not (zero? v)))) (k/to-kstream) (k/to topic-b))))] @@ -585,7 +584,7 @@ (with-open [driver (mock/build-driver (fn [builder] (-> (k/ktable builder topic-a) - (k/filter-not (fn [[k v]] + (k/filter-not (fn [[_k v]] (not (zero? v)))) (k/to-kstream) (k/to topic-b))))] @@ -912,7 +911,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/reduce + topic-a) (k/to-kstream) (k/to topic-b)))) @@ -934,7 +933,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/reduce +) (k/to-kstream) (k/to topic-b)))) @@ -956,9 +955,9 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/aggregate (constantly -10) - (fn [acc [k v]] (+ acc v)) + (fn [acc [_k v]] (+ acc v)) topic-a) (k/to-kstream) (k/to topic-b)))) @@ -1007,9 +1006,9 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/aggregate (constantly -10) - (fn [acc [k v]] (+ acc v))) + (fn [acc [_k v]] (+ acc v))) (k/to-kstream) (k/to topic-b)))) publish (partial mock/publish driver topic-a)] @@ -1033,7 +1032,7 @@ (-> in (k/group-by-key) (k/aggregate (constantly []) - (fn [acc [k v]] + (fn [acc [_k v]] (concat [(last acc)] [v])) (assoc topic-in @@ -1060,7 +1059,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/window-by-time (TimeWindows/of 1000)) (k/reduce + topic-a) (k/to-kstream) @@ -1108,7 +1107,7 @@ driver (mock/build-driver (fn [builder] (-> builder (k/kstream topic-a) - (k/group-by (fn [[k v]] (long (/ k 10))) topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) (k/window-by-session (SessionWindows/with 1000)) (k/reduce + topic-a) (k/to-kstream) @@ -1138,10 +1137,10 @@ (k/group-by-key) (k/window-by-session (SessionWindows/with 1000)) (k/aggregate (constantly 0) - (fn [agg [k v]] + (fn [agg [_k v]] (+ agg v)) ;; Merger - (fn [k agg1 agg2] + (fn [_k agg1 agg2] (+ agg1 agg2)) topic-a) (k/to-kstream) @@ -1176,8 +1175,8 @@ [(long (/ k 10)) v]) topic-a) (k/aggregate (constantly 0) - (fn [acc [k v]] (+ acc v)) - (fn [acc [k v]] (- acc v)) + (fn [acc [_k v]] (+ acc v)) + (fn [acc [_k v]] (- acc v)) topic-b) (k/to-kstream) (k/to topic-b))))] @@ -1233,8 +1232,8 @@ [(long (/ k 10)) v]) topic-a) (k/aggregate (constantly 0) - (fn [acc [k v]] (+ acc v)) - (fn [acc [k v]] (- acc v))) + (fn [acc [_k v]] (+ acc v)) + (fn [acc [_k v]] (- acc v))) (k/to-kstream) (k/to topic-b))))] (let [publish (partial mock/publish driver topic-a)] @@ -1337,7 +1336,7 @@ k-table (k/global-ktable builder topic-b)] (-> k-stream (k/join-global k-table - (fn [[k v]] + (fn [[k _v]] k) +) (k/to topic-c)))))] @@ -1360,7 +1359,7 @@ k-table (k/global-ktable builder topic-b)] (-> k-stream (k/left-join-global k-table - (fn [[k v]] + (fn [[k _v]] k) safe-add) (k/to topic-c)))))] @@ -1393,7 +1392,7 @@ (publisher 100 {:val 10}) - (let [[[k v]] (mock/get-keyvals driver output-t)] + (let [[[_k v]] (mock/get-keyvals driver output-t)] (is (= 11 (:new-val v))) (is (= "input-topic" (:topic v))))))))) diff --git a/test/jackdaw/test/commands/base_test.clj b/test/jackdaw/test/commands/base_test.clj index 168b5b31..2b9ceabb 100644 --- a/test/jackdaw/test/commands/base_test.clj +++ b/test/jackdaw/test/commands/base_test.clj @@ -2,7 +2,7 @@ (:require [jackdaw.test.commands.base :as cmd] [clojure.pprint :as pprint] - [clojure.test :refer :all])) + [clojure.test :refer [deftest is testing]])) (set! *warn-on-reflection* false) diff --git a/test/jackdaw/test/commands/write_test.clj b/test/jackdaw/test/commands/write_test.clj index 99e3db12..51eae7d1 100644 --- a/test/jackdaw/test/commands/write_test.clj +++ b/test/jackdaw/test/commands/write_test.clj @@ -4,8 +4,7 @@ [jackdaw.test.transports :as trns] [jackdaw.test.transports.kafka] [jackdaw.test.serde :as serde] - [jackdaw.test :refer [test-machine]] - [clojure.test :refer :all]) + [clojure.test :refer [deftest is testing]]) (:import [clojure.lang ExceptionInfo])) @@ -115,7 +114,6 @@ :partition-count 5 :key-serde :long :value-serde :json}) - opts {} msg {:id 1 :a 2 :b 3 :payload "yolo"}] (testing "partition must be >= 0" diff --git a/test/jackdaw/test/commands_test.clj b/test/jackdaw/test/commands_test.clj index a5f72103..e2ad492d 100644 --- a/test/jackdaw/test/commands_test.clj +++ b/test/jackdaw/test/commands_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.test.commands-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.test.commands :as cmd])) (set! *warn-on-reflection* false) diff --git a/test/jackdaw/test/fixtures_test.clj b/test/jackdaw/test/fixtures_test.clj index 0107825e..5825623d 100644 --- a/test/jackdaw/test/fixtures_test.clj +++ b/test/jackdaw/test/fixtures_test.clj @@ -1,10 +1,9 @@ (ns jackdaw.test.fixtures-test (:require - [clojure.java.io :as io] - [clojure.test :refer :all] - [jackdaw.test.fixtures :refer :all]) + [clojure.test :refer [deftest is]] + [jackdaw.test.fixtures :refer [list-topics reset-application-fixture topic-fixture with-fixtures]]) (:import - (org.apache.kafka.clients.admin AdminClient NewTopic))) + (org.apache.kafka.clients.admin AdminClient))) (set! *warn-on-reflection* false) @@ -35,7 +34,6 @@ (is (topic-exists? client topic-foo))))) (defn test-resetter - "" {:style/indent 1} [{:keys [app-config reset-params reset-fn]} assertion-fn] (let [reset-args (atom []) @@ -82,7 +80,7 @@ (.write *err* "helpful error message\n") (.write *out* "essential application info\n") 1)} - (fn [{:keys [resetter reset-args error-data]}] + (fn [{:keys [resetter error-data]}] (is (instance? kafka.tools.StreamsResetter resetter)) (is (= 1 (:status error-data))) (is (= "helpful error message\n" (:err error-data))) diff --git a/test/jackdaw/test/journal_test.clj b/test/jackdaw/test/journal_test.clj index 4932cfe7..4221f2a7 100644 --- a/test/jackdaw/test/journal_test.clj +++ b/test/jackdaw/test/journal_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.test.journal-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.test.journal :as jrnl])) (set! *warn-on-reflection* false) diff --git a/test/jackdaw/test/middleware_test.clj b/test/jackdaw/test/middleware_test.clj index 565bc4bd..b11ffdf8 100644 --- a/test/jackdaw/test/middleware_test.clj +++ b/test/jackdaw/test/middleware_test.clj @@ -1,6 +1,6 @@ (ns jackdaw.test.middleware-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [clojure.tools.logging :as log] [jackdaw.test.middleware :as middle] [jackdaw.test.transports :as trns] @@ -16,12 +16,9 @@ (defn with-identity-transport [{:keys [test-id transport]} f] (with-open [machine (jd.test/test-machine (transport))] - (when test-id - (log/info "begin" test-id)) - - (let [result (f machine)] - (when test-id - (log/info "end" test-id))))) + (when test-id (log/info "begin" test-id)) + (f machine) + (when test-id (log/info "end" test-id)))) (deftest test-with-status diff --git a/test/jackdaw/test/transports/kafka_test.clj b/test/jackdaw/test/transports/kafka_test.clj index 66d358e1..aa100db5 100644 --- a/test/jackdaw/test/transports/kafka_test.clj +++ b/test/jackdaw/test/transports/kafka_test.clj @@ -1,16 +1,14 @@ (ns jackdaw.test.transports.kafka-test (:require [clojure.tools.logging :as log] - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.streams :as k] [jackdaw.test :as jd.test] [jackdaw.test.fixtures :as fix] - [jackdaw.test.journal :refer [with-journal watch-for]] + [jackdaw.test.journal :refer [watch-for]] [jackdaw.test.serde :as serde] [jackdaw.test.transports.kafka] - [manifold.stream :as s]) - (:import - (java.util Properties))) + [manifold.stream :as s])) (set! *warn-on-reflection* false) @@ -83,7 +81,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] @@ -109,7 +106,6 @@ (let [msg {:id 2 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] diff --git a/test/jackdaw/test/transports/mock_test.clj b/test/jackdaw/test/transports/mock_test.clj index 59d7b817..05115a4a 100644 --- a/test/jackdaw/test/transports/mock_test.clj +++ b/test/jackdaw/test/transports/mock_test.clj @@ -1,9 +1,9 @@ (ns jackdaw.test.transports.mock-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [clojure.tools.logging :as log] [jackdaw.streams :as k] - [jackdaw.test.journal :refer [with-journal watch-for]] + [jackdaw.test.journal :refer [watch-for]] [jackdaw.test :as jd.test] [jackdaw.test.transports :as trns] [jackdaw.test.serde :as serde] @@ -74,12 +74,12 @@ (deftest test-driver-closed-after-use (let [driver-closed? (atom false) driver (reify java.io.Closeable - (close [this] + (close [_this] (reset! driver-closed? true))) transport (trns/transport {:type :mock :driver driver :topics {}})] - (with-open [machine (jd.test/test-machine transport)] + (with-open [_machine (jd.test/test-machine transport)] (is (not @driver-closed?))) (is @driver-closed?))) @@ -90,7 +90,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] @@ -112,7 +111,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] diff --git a/test/jackdaw/test/transports/rest_proxy_test.clj b/test/jackdaw/test/transports/rest_proxy_test.clj index 92e169c7..fd0b916a 100644 --- a/test/jackdaw/test/transports/rest_proxy_test.clj +++ b/test/jackdaw/test/transports/rest_proxy_test.clj @@ -2,19 +2,17 @@ (:require [byte-streams :as bs] [clojure.tools.logging :as log] - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [clojure.data.json :as json] [jackdaw.streams :as k] [jackdaw.test :as jd.test] [jackdaw.test.fixtures :as fix] [jackdaw.test.serde :as serde] - [jackdaw.test.journal :refer [with-journal watch-for]] + [jackdaw.test.journal :refer [watch-for]] [jackdaw.test.transports :as trns] [jackdaw.test.transports.rest-proxy :as proxy] [manifold.stream :as s] - [manifold.deferred :as d]) - (:import - (java.util Properties))) + [manifold.deferred :as d])) (set! *warn-on-reflection* false) @@ -103,7 +101,6 @@ (let [msg {:id 1 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] @@ -129,7 +126,6 @@ (let [msg {:id 2 :payload "foo"} topic test-in messages (get-in t [:producer :messages]) - serdes (get-in t [:serdes]) ack (promise) msg-key (:id msg)] @@ -171,21 +167,21 @@ (deftest test-rest-proxy-group-config (let [http-reqs (atom [])] (binding [proxy/*http-client* {:post (mock-http-client http-reqs)}] - (let [client (-> (proxy/rest-proxy-client (-> (rest-proxy-config "test-group-config") + (let [_client (-> (proxy/rest-proxy-client (-> (rest-proxy-config "test-group-config") (assoc :group-config {:auto.offset.reset "earliest" :fetch.min.bytes 100 :consumer.fetch.timeout.ms 200}))) - (proxy/with-consumer))] - (let [[url options] (first @http-reqs)] - (is (= "http://localhost:8082/consumers/test-group-config" url)) - (is (= {"Accept" "application/vnd.kafka.v2+json" - "Content-Type" "application/vnd.kafka.v2+json"} - (:headers options))) - (is (= {"auto.offset.reset" "earliest" - "fetch.min.bytes" 100 - "consumer.fetch.timeout.ms" 200} - (-> (:body options) - (json/read-str) - (select-keys ["auto.offset.reset" - "fetch.min.bytes" - "consumer.fetch.timeout.ms"]))))))))) + (proxy/with-consumer)) + [url options] (first @http-reqs)] + (is (= "http://localhost:8082/consumers/test-group-config" url)) + (is (= {"Accept" "application/vnd.kafka.v2+json" + "Content-Type" "application/vnd.kafka.v2+json"} + (:headers options))) + (is (= {"auto.offset.reset" "earliest" + "fetch.min.bytes" 100 + "consumer.fetch.timeout.ms" 200} + (-> (:body options) + (json/read-str) + (select-keys ["auto.offset.reset" + "fetch.min.bytes" + "consumer.fetch.timeout.ms"])))))))) diff --git a/test/jackdaw/test_test.clj b/test/jackdaw/test_test.clj index 481271ac..b188b145 100644 --- a/test/jackdaw/test_test.clj +++ b/test/jackdaw/test_test.clj @@ -1,16 +1,13 @@ (ns jackdaw.test-test (:require - [clojure.test :refer :all] + [clojure.test :refer [deftest is testing]] [jackdaw.streams :as k] [jackdaw.test :as jd.test] [jackdaw.test.commands :as cmd] [jackdaw.test.fixtures :as fix] [jackdaw.test.serde :as serde] [jackdaw.test.transports :as trns] - [jackdaw.test.middleware :refer [with-status]]) - (:import - (java.util Properties) - (org.apache.kafka.streams TopologyTestDriver))) + [jackdaw.test.middleware :refer [with-status]])) (set! *warn-on-reflection* false) @@ -60,7 +57,7 @@ (deftest test-run-test (testing "the run test machinery" - (let [m {:executor (-> (fn [m c] + (let [m {:executor (-> (fn [_machine c] (let [[cmd & params] c] (apply ({:min (fn [v] {:result {:result (apply min v)}}) :max (fn [v] {:result {:result (apply max v)}}) @@ -73,7 +70,7 @@ :journal (atom {})}] (testing "works properly" - (let [{:keys [results journal]} + (let [{:keys [results]} (jd.test/run-test m [[:min [1 2 3]] [:max [1 2 3]] [:is-1 1]])] @@ -91,7 +88,7 @@ (testing "execution stops on an unknown command" (is (thrown? NullPointerException - (let [{:keys [results journal]} + (let [{:keys [results]} (jd.test/run-test m [[:min [1 2 3]] [:foo 2] [:max [1 2 3]]])] @@ -219,17 +216,17 @@ [in out] (fn [builder] (let [in (-> (k/kstream builder in) - (k/map (fn [[k v]] + (k/map (fn [_record] (throw (ex-info "bad topology" {})))))] (k/to in out) builder))) (defn bad-key-fn - [msg] + [_msg] (throw (ex-info "bad-key-fn" {}))) (defn bad-watch-fn - [journal] + [_journal] (throw (ex-info "bad-watch-fn" {}))) (deftest test-machine-happy-path