From 6aecd547a2152be65d4c8c4aaa4169f408b0b7b6 Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Tue, 13 Nov 2018 11:57:45 -0800 Subject: [PATCH] Be explicit with tuples for %s formatting Fix #1633 --- kafka/admin/kafka.py | 2 +- kafka/client.py | 2 +- kafka/client_async.py | 2 +- kafka/consumer/fetcher.py | 14 +++++++------- kafka/consumer/group.py | 2 +- kafka/consumer/simple.py | 2 +- kafka/consumer/subscription_state.py | 2 +- kafka/coordinator/consumer.py | 6 +++--- kafka/metrics/metrics.py | 2 +- kafka/metrics/stats/percentiles.py | 2 +- kafka/metrics/stats/rate.py | 2 +- kafka/metrics/stats/sampled_stat.py | 2 +- kafka/metrics/stats/sensor.py | 2 +- kafka/producer/base.py | 4 ++-- kafka/producer/future.py | 2 +- kafka/producer/kafka.py | 10 +++++----- kafka/producer/keyed.py | 2 +- kafka/producer/record_accumulator.py | 6 +++--- kafka/producer/simple.py | 2 +- kafka/protocol/legacy.py | 2 +- kafka/protocol/message.py | 4 ++-- kafka/protocol/parser.py | 2 +- kafka/record/legacy_records.py | 2 +- test/fixtures.py | 8 ++++---- test/test_metrics.py | 2 +- test/test_producer.py | 2 +- test/testutil.py | 2 +- 27 files changed, 46 insertions(+), 46 deletions(-) diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index fbbbcc2a2..98bac5d53 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -166,7 +166,7 @@ def __init__(self, **configs): log.debug("Starting Kafka administration interface") extra_configs = set(configs).difference(self.DEFAULT_CONFIG) if extra_configs: - raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs) + raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,)) self.config = copy.copy(self.DEFAULT_CONFIG) self.config.update(configs) diff --git a/kafka/client.py b/kafka/client.py index 789d4da3d..148cae0d8 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -174,7 +174,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): return decoder_fn(future.value) - raise KafkaUnavailableError('All servers failed to process request: %s' % hosts) + raise KafkaUnavailableError('All servers failed to process request: %s' % (hosts,)) def _payloads_by_broker(self, payloads): payloads_by_broker = collections.defaultdict(list) diff --git a/kafka/client_async.py b/kafka/client_async.py index c3fcc7995..8e83b8506 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -355,7 +355,7 @@ def _maybe_connect(self, node_id): conn = self._conns.get(node_id) if conn is None: - assert broker, 'Broker id %s not in current metadata' % node_id + assert broker, 'Broker id %s not in current metadata' % (node_id,) log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7d58b7caa..36388319e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -298,7 +298,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): remaining_ms = timeout_ms - elapsed_ms raise Errors.KafkaTimeoutError( - "Failed to get offsets by timestamps in %s ms" % timeout_ms) + "Failed to get offsets by timestamps in %s ms" % (timeout_ms,)) def fetched_records(self, max_records=None): """Returns previously fetched records and updates consumed offsets. @@ -911,7 +911,7 @@ def record(self, partition, num_bytes, num_records): class FetchManagerMetrics(object): def __init__(self, metrics, prefix): self.metrics = metrics - self.group_name = '%s-fetch-manager-metrics' % prefix + self.group_name = '%s-fetch-manager-metrics' % (prefix,) self.bytes_fetched = metrics.sensor('bytes-fetched') self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name, @@ -955,15 +955,15 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records): bytes_fetched = self.metrics.sensor(name) bytes_fetched.add(self.metrics.metric_name('fetch-size-avg', self.group_name, - 'The average number of bytes fetched per request for topic %s' % topic, + 'The average number of bytes fetched per request for topic %s' % (topic,), metric_tags), Avg()) bytes_fetched.add(self.metrics.metric_name('fetch-size-max', self.group_name, - 'The maximum number of bytes fetched per request for topic %s' % topic, + 'The maximum number of bytes fetched per request for topic %s' % (topic,), metric_tags), Max()) bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate', self.group_name, - 'The average number of bytes consumed per second for topic %s' % topic, + 'The average number of bytes consumed per second for topic %s' % (topic,), metric_tags), Rate()) bytes_fetched.record(num_bytes) @@ -976,10 +976,10 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records): records_fetched = self.metrics.sensor(name) records_fetched.add(self.metrics.metric_name('records-per-request-avg', self.group_name, - 'The average number of records in each request for topic %s' % topic, + 'The average number of records in each request for topic %s' % (topic,), metric_tags), Avg()) records_fetched.add(self.metrics.metric_name('records-consumed-rate', self.group_name, - 'The average number of records consumed per second for topic %s' % topic, + 'The average number of records consumed per second for topic %s' % (topic,), metric_tags), Rate()) records_fetched.record(num_records) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 279cce033..8727de791 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -309,7 +309,7 @@ def __init__(self, *topics, **configs): # Only check for extra config keys in top-level class extra_configs = set(configs).difference(self.DEFAULT_CONFIG) if extra_configs: - raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs) + raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,)) self.config = copy.copy(self.DEFAULT_CONFIG) self.config.update(configs) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index b60a5865b..a6a64a58f 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -247,7 +247,7 @@ def seek(self, offset, whence=None, partition=None): self.offsets[resp.partition] = \ resp.offsets[0] + deltas[resp.partition] else: - raise ValueError('Unexpected value for `whence`, %d' % whence) + raise ValueError('Unexpected value for `whence`, %d' % (whence,)) # Reset queue and fetch offsets since they are invalid self.fetch_offsets = self.offsets.copy() diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 10d722ec5..4b0b275c1 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -247,7 +247,7 @@ def assign_from_subscribed(self, assignments): for tp in assignments: if tp.topic not in self.subscription: - raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp)) + raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,)) # after rebalancing, we always reinitialize the assignment state self.assignment.clear() diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 647a6b585..14eee0fdc 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -216,7 +216,7 @@ def _on_join_complete(self, generation, member_id, protocol, self._assignment_snapshot = None assignor = self._lookup_assignor(protocol) - assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol + assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,) assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) @@ -297,7 +297,7 @@ def time_to_next_poll(self): def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) - assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy + assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,) member_metadata = {} all_subscribed_topics = set() for member_id, metadata_bytes in members: @@ -804,7 +804,7 @@ def _maybe_auto_commit_offsets_async(self): class ConsumerCoordinatorMetrics(object): def __init__(self, metrics, metric_group_prefix, subscription): self.metrics = metrics - self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix + self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,) self.commit_latency = metrics.sensor('commit-latency') self.commit_latency.add(metrics.metric_name( diff --git a/kafka/metrics/metrics.py b/kafka/metrics/metrics.py index f2e99edc9..2c53488ff 100644 --- a/kafka/metrics/metrics.py +++ b/kafka/metrics/metrics.py @@ -225,7 +225,7 @@ def register_metric(self, metric): with self._lock: if metric.metric_name in self.metrics: raise ValueError('A metric named "%s" already exists, cannot' - ' register another one.' % metric.metric_name) + ' register another one.' % (metric.metric_name,)) self.metrics[metric.metric_name] = metric for reporter in self._reporters: reporter.metric_change(metric) diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py index b55c5accc..6d702e80f 100644 --- a/kafka/metrics/stats/percentiles.py +++ b/kafka/metrics/stats/percentiles.py @@ -27,7 +27,7 @@ def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0, ' to be 0.0.') self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val) else: - ValueError('Unknown bucket type: %s' % bucketing) + ValueError('Unknown bucket type: %s' % (bucketing,)) def stats(self): measurables = [] diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py index 810c5435b..68393fbf7 100644 --- a/kafka/metrics/stats/rate.py +++ b/kafka/metrics/stats/rate.py @@ -101,7 +101,7 @@ def convert(self, time_ms): elif self._unit == TimeUnit.DAYS: return time_ms / (24.0 * 60.0 * 60.0 * 1000.0) else: - raise ValueError('Unknown unit: %s' % self._unit) + raise ValueError('Unknown unit: %s' % (self._unit,)) class SampledTotal(AbstractSampledStat): diff --git a/kafka/metrics/stats/sampled_stat.py b/kafka/metrics/stats/sampled_stat.py index c41b14bbc..50301120a 100644 --- a/kafka/metrics/stats/sampled_stat.py +++ b/kafka/metrics/stats/sampled_stat.py @@ -73,7 +73,7 @@ def purge_obsolete_samples(self, config, now): sample.reset(now) def _advance(self, config, time_ms): - self._current = (self._current + 1) % config.samples + self._current = (self._current + 1) % (config.samples,) if self._current >= len(self._samples): sample = self.new_sample(time_ms) self._samples.append(sample) diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py index 73a46651f..571723f97 100644 --- a/kafka/metrics/stats/sensor.py +++ b/kafka/metrics/stats/sensor.py @@ -35,7 +35,7 @@ def _check_forest(self, sensors): """Validate that this sensor doesn't end up referencing itself.""" if self in sensors: raise ValueError('Circular dependency in sensors: %s is its own' - 'parent.' % self.name) + 'parent.' % (self.name,)) sensors.add(self) for parent in self._parents: parent._check_forest(sensors) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 1da74c841..b32396634 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -316,7 +316,7 @@ def __init__(self, client, if codec is None: codec = CODEC_NONE elif codec not in ALL_CODECS: - raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) + raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,)) self.codec = codec self.codec_compresslevel = codec_compresslevel @@ -419,7 +419,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs): raise AsyncProducerQueueFull( msg[idx:], 'Producer async queue overfilled. ' - 'Current queue size %d.' % self.queue.qsize()) + 'Current queue size %d.' % (self.queue.qsize(),)) resp = [] else: messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel) diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 1c5d6d7bf..f67db0979 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -59,7 +59,7 @@ def _produce_success(self, offset_and_timestamp): def get(self, timeout=None): if not self.is_done and not self._produce_future.wait(timeout): raise Errors.KafkaTimeoutError( - "Timeout after waiting for %s secs." % timeout) + "Timeout after waiting for %s secs." % (timeout,)) assert self.is_done if self.failed(): raise self.exception # pylint: disable-msg=raising-bad-type diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 45bb05834..685c3f9c1 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -340,11 +340,11 @@ def __init__(self, **configs): self.config[key] = configs.pop(key) # Only check for extra config keys in top-level class - assert not configs, 'Unrecognized configs: %s' % configs + assert not configs, 'Unrecognized configs: %s' % (configs,) if self.config['client_id'] is None: self.config['client_id'] = 'kafka-python-producer-%s' % \ - PRODUCER_CLIENT_ID_SEQUENCE.increment() + (PRODUCER_CLIENT_ID_SEQUENCE.increment(),) if self.config['acks'] == 'all': self.config['acks'] = -1 @@ -633,12 +633,12 @@ def _ensure_valid_record_size(self, size): raise Errors.MessageSizeTooLargeError( "The message is %d bytes when serialized which is larger than" " the maximum request size you have configured with the" - " max_request_size configuration" % size) + " max_request_size configuration" % (size,)) if size > self.config['buffer_memory']: raise Errors.MessageSizeTooLargeError( "The message is %d bytes when serialized which is larger than" " the total memory buffer you have configured with the" - " buffer_memory configuration." % size) + " buffer_memory configuration." % (size,)) def _wait_on_metadata(self, topic, max_wait): """ @@ -679,7 +679,7 @@ def _wait_on_metadata(self, topic, max_wait): elapsed = time.time() - begin if not metadata_event.is_set(): raise Errors.KafkaTimeoutError( - "Failed to update metadata after %.1f secs." % max_wait) + "Failed to update metadata after %.1f secs." % (max_wait,)) elif topic in self._metadata.unauthorized_topics: raise Errors.TopicAuthorizationFailedError(topic) else: diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 62bb733fc..3ba92166e 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -46,4 +46,4 @@ def send(self, topic, key, msg): return self.send_messages(topic, key, msg) def __repr__(self): - return '' % self.async_send + return '' % (self.async_send,) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 728bf18e6..eeb928d70 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -102,11 +102,11 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full) error = None if not self.in_retry() and is_full and timeout < since_append: - error = "%d seconds have passed since last append" % since_append + error = "%d seconds have passed since last append" % (since_append,) elif not self.in_retry() and timeout < since_ready: - error = "%d seconds have passed since batch creation plus linger time" % since_ready + error = "%d seconds have passed since batch creation plus linger time" % (since_ready,) elif self.in_retry() and timeout < since_backoff: - error = "%d seconds have passed since last attempt plus backoff time" % since_backoff + error = "%d seconds have passed since last attempt plus backoff time" % (since_backoff,) if error: self.records.close() diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index e06e65954..f334a49d3 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -51,4 +51,4 @@ def send_messages(self, topic, *msg): ) def __repr__(self): - return '' % self.async_send + return '' % (self.async_send,) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 7dd258032..2e8f5bc17 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -471,4 +471,4 @@ def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None) elif codec == CODEC_SNAPPY: return [create_snappy_message(messages, key)] else: - raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) + raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,)) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 19dcbd9de..31527bf63 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -77,7 +77,7 @@ def _encode_self(self, recalc_crc=True): elif version == 0: fields = (self.crc, self.magic, self.attributes, self.key, self.value) else: - raise ValueError('Unrecognized message version: %s' % version) + raise ValueError('Unrecognized message version: %s' % (version,)) message = Message.SCHEMAS[version].encode(fields) if not recalc_crc: return message @@ -143,7 +143,7 @@ def __hash__(self): class PartialMessage(bytes): def __repr__(self): - return 'PartialMessage(%s)' % self + return 'PartialMessage(%s)' % (self,) class MessageSet(AbstractType): diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index 4d77bb32d..a99b3ae68 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -136,7 +136,7 @@ def _process_response(self, read_buffer): raise Errors.CorrelationIdError( 'No in-flight-request found for server response' ' with correlation ID %d' - % recv_correlation_id) + % (recv_correlation_id,)) (correlation_id, request) = self.in_flight_requests.popleft() diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 1bdba8152..bb6c21c2d 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -254,7 +254,7 @@ def __iter__(self): # There should only ever be a single layer of compression assert not attrs & self.CODEC_MASK, ( 'MessageSet at offset %d appears double-compressed. This ' - 'should not happen -- check your producers!' % offset) + 'should not happen -- check your producers!' % (offset,)) # When magic value is greater than 0, the timestamp # of a compressed message depends on the diff --git a/test/fixtures.py b/test/fixtures.py index 76e3071f3..6f7fc3f72 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -102,7 +102,7 @@ def kafka_run_class_args(cls, *args): def kafka_run_class_env(self): env = os.environ.copy() env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \ - self.test_resource("log4j.properties") + (self.test_resource("log4j.properties"),) return env @classmethod @@ -110,7 +110,7 @@ def render_template(cls, source_file, target_file, binding): log.info('Rendering %s from template %s', target_file.strpath, source_file) with open(source_file, "r") as handle: template = handle.read() - assert len(template) > 0, 'Empty template %s' % source_file + assert len(template) > 0, 'Empty template %s' % (source_file,) with open(target_file.strpath, "w") as handle: handle.write(template.format(**binding)) handle.flush() @@ -257,7 +257,7 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, # TODO: checking for port connection would be better than scanning logs # until then, we need the pattern to work across all supported broker versions # The logging format changed slightly in 1.0.0 - self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % broker_id + self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,) self.zookeeper = zookeeper self.zk_chroot = zk_chroot @@ -291,7 +291,7 @@ def _create_zk_chroot(self): "%s:%d" % (self.zookeeper.host, self.zookeeper.port), "create", - "/%s" % self.zk_chroot, + "/%s" % (self.zk_chroot,), "kafka-python") env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/test/test_metrics.py b/test/test_metrics.py index 8d35f5534..308ea5831 100644 --- a/test/test_metrics.py +++ b/test/test_metrics.py @@ -469,7 +469,7 @@ def test_reporter(metrics): for key in list(expected.keys()): metrics = expected.pop(key) - expected['foo.%s' % key] = metrics + expected['foo.%s' % (key,)] = metrics assert expected == foo_reporter.snapshot() diff --git a/test/test_producer.py b/test/test_producer.py index 16da61898..77ebcb225 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -65,7 +65,7 @@ def test_end_to_end(kafka_broker, compression): except StopIteration: break - assert msgs == set(['msg %d' % i for i in range(messages)]) + assert msgs == set(['msg %d' % (i for i in range(messages),)]) consumer.close() diff --git a/test/testutil.py b/test/testutil.py index 6f6cafb5e..a8227cfb6 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -32,7 +32,7 @@ def construct_lambda(s): op_str = s[0:2] # >= <= v_str = s[2:] else: - raise ValueError('Unrecognized kafka version / operator: %s' % s) + raise ValueError('Unrecognized kafka version / operator: %s' % (s,)) op_map = { '=': operator.eq,