Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Be explicit with tuples for %s formatting #1634

Merged
merged 1 commit into from
Nov 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def __init__(self, **configs):
log.debug("Starting Kafka administration interface")
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs)
raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,))

self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
Expand Down
2 changes: 1 addition & 1 deletion kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):

return decoder_fn(future.value)

raise KafkaUnavailableError('All servers failed to process request: %s' % hosts)
raise KafkaUnavailableError('All servers failed to process request: %s' % (hosts,))

def _payloads_by_broker(self, payloads):
payloads_by_broker = collections.defaultdict(list)
Expand Down
2 changes: 1 addition & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def _maybe_connect(self, node_id):
conn = self._conns.get(node_id)

if conn is None:
assert broker, 'Broker id %s not in current metadata' % node_id
assert broker, 'Broker id %s not in current metadata' % (node_id,)

log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
Expand Down
14 changes: 7 additions & 7 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
remaining_ms = timeout_ms - elapsed_ms

raise Errors.KafkaTimeoutError(
"Failed to get offsets by timestamps in %s ms" % timeout_ms)
"Failed to get offsets by timestamps in %s ms" % (timeout_ms,))

def fetched_records(self, max_records=None):
"""Returns previously fetched records and updates consumed offsets.
Expand Down Expand Up @@ -911,7 +911,7 @@ def record(self, partition, num_bytes, num_records):
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):
self.metrics = metrics
self.group_name = '%s-fetch-manager-metrics' % prefix
self.group_name = '%s-fetch-manager-metrics' % (prefix,)

self.bytes_fetched = metrics.sensor('bytes-fetched')
self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
Expand Down Expand Up @@ -955,15 +955,15 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
bytes_fetched = self.metrics.sensor(name)
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
self.group_name,
'The average number of bytes fetched per request for topic %s' % topic,
'The average number of bytes fetched per request for topic %s' % (topic,),
metric_tags), Avg())
bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
self.group_name,
'The maximum number of bytes fetched per request for topic %s' % topic,
'The maximum number of bytes fetched per request for topic %s' % (topic,),
metric_tags), Max())
bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
self.group_name,
'The average number of bytes consumed per second for topic %s' % topic,
'The average number of bytes consumed per second for topic %s' % (topic,),
metric_tags), Rate())
bytes_fetched.record(num_bytes)

Expand All @@ -976,10 +976,10 @@ def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
records_fetched = self.metrics.sensor(name)
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
self.group_name,
'The average number of records in each request for topic %s' % topic,
'The average number of records in each request for topic %s' % (topic,),
metric_tags), Avg())
records_fetched.add(self.metrics.metric_name('records-consumed-rate',
self.group_name,
'The average number of records consumed per second for topic %s' % topic,
'The average number of records consumed per second for topic %s' % (topic,),
metric_tags), Rate())
records_fetched.record(num_records)
2 changes: 1 addition & 1 deletion kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def __init__(self, *topics, **configs):
# Only check for extra config keys in top-level class
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs)
raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,))

self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def seek(self, offset, whence=None, partition=None):
self.offsets[resp.partition] = \
resp.offsets[0] + deltas[resp.partition]
else:
raise ValueError('Unexpected value for `whence`, %d' % whence)
raise ValueError('Unexpected value for `whence`, %d' % (whence,))

# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def assign_from_subscribed(self, assignments):

for tp in assignments:
if tp.topic not in self.subscription:
raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp))
raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,))

# after rebalancing, we always reinitialize the assignment state
self.assignment.clear()
Expand Down
6 changes: 3 additions & 3 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def _on_join_complete(self, generation, member_id, protocol,
self._assignment_snapshot = None

assignor = self._lookup_assignor(protocol)
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,)

assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)

Expand Down Expand Up @@ -297,7 +297,7 @@ def time_to_next_poll(self):

def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,)
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:
Expand Down Expand Up @@ -804,7 +804,7 @@ def _maybe_auto_commit_offsets_async(self):
class ConsumerCoordinatorMetrics(object):
def __init__(self, metrics, metric_group_prefix, subscription):
self.metrics = metrics
self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix
self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,)

self.commit_latency = metrics.sensor('commit-latency')
self.commit_latency.add(metrics.metric_name(
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def register_metric(self, metric):
with self._lock:
if metric.metric_name in self.metrics:
raise ValueError('A metric named "%s" already exists, cannot'
' register another one.' % metric.metric_name)
' register another one.' % (metric.metric_name,))
self.metrics[metric.metric_name] = metric
for reporter in self._reporters:
reporter.metric_change(metric)
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/percentiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
' to be 0.0.')
self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
else:
ValueError('Unknown bucket type: %s' % bucketing)
ValueError('Unknown bucket type: %s' % (bucketing,))

def stats(self):
measurables = []
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def convert(self, time_ms):
elif self._unit == TimeUnit.DAYS:
return time_ms / (24.0 * 60.0 * 60.0 * 1000.0)
else:
raise ValueError('Unknown unit: %s' % self._unit)
raise ValueError('Unknown unit: %s' % (self._unit,))


class SampledTotal(AbstractSampledStat):
Expand Down
2 changes: 1 addition & 1 deletion kafka/metrics/stats/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _check_forest(self, sensors):
"""Validate that this sensor doesn't end up referencing itself."""
if self in sensors:
raise ValueError('Circular dependency in sensors: %s is its own'
'parent.' % self.name)
'parent.' % (self.name,))
sensors.add(self)
for parent in self._parents:
parent._check_forest(sensors)
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def __init__(self, client,
if codec is None:
codec = CODEC_NONE
elif codec not in ALL_CODECS:
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))

self.codec = codec
self.codec_compresslevel = codec_compresslevel
Expand Down Expand Up @@ -419,7 +419,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
raise AsyncProducerQueueFull(
msg[idx:],
'Producer async queue overfilled. '
'Current queue size %d.' % self.queue.qsize())
'Current queue size %d.' % (self.queue.qsize(),))
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def _produce_success(self, offset_and_timestamp):
def get(self, timeout=None):
if not self.is_done and not self._produce_future.wait(timeout):
raise Errors.KafkaTimeoutError(
"Timeout after waiting for %s secs." % timeout)
"Timeout after waiting for %s secs." % (timeout,))
assert self.is_done
if self.failed():
raise self.exception # pylint: disable-msg=raising-bad-type
Expand Down
10 changes: 5 additions & 5 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,11 @@ def __init__(self, **configs):
self.config[key] = configs.pop(key)

# Only check for extra config keys in top-level class
assert not configs, 'Unrecognized configs: %s' % configs
assert not configs, 'Unrecognized configs: %s' % (configs,)

if self.config['client_id'] is None:
self.config['client_id'] = 'kafka-python-producer-%s' % \
PRODUCER_CLIENT_ID_SEQUENCE.increment()
(PRODUCER_CLIENT_ID_SEQUENCE.increment(),)

if self.config['acks'] == 'all':
self.config['acks'] = -1
Expand Down Expand Up @@ -633,12 +633,12 @@ def _ensure_valid_record_size(self, size):
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the maximum request size you have configured with the"
" max_request_size configuration" % size)
" max_request_size configuration" % (size,))
if size > self.config['buffer_memory']:
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the total memory buffer you have configured with the"
" buffer_memory configuration." % size)
" buffer_memory configuration." % (size,))

def _wait_on_metadata(self, topic, max_wait):
"""
Expand Down Expand Up @@ -679,7 +679,7 @@ def _wait_on_metadata(self, topic, max_wait):
elapsed = time.time() - begin
if not metadata_event.is_set():
raise Errors.KafkaTimeoutError(
"Failed to update metadata after %.1f secs." % max_wait)
"Failed to update metadata after %.1f secs." % (max_wait,))
elif topic in self._metadata.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
else:
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/keyed.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ def send(self, topic, key, msg):
return self.send_messages(topic, key, msg)

def __repr__(self):
return '<KeyedProducer batch=%s>' % self.async_send
return '<KeyedProducer batch=%s>' % (self.async_send,)
6 changes: 3 additions & 3 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full)

error = None
if not self.in_retry() and is_full and timeout < since_append:
error = "%d seconds have passed since last append" % since_append
error = "%d seconds have passed since last append" % (since_append,)
elif not self.in_retry() and timeout < since_ready:
error = "%d seconds have passed since batch creation plus linger time" % since_ready
error = "%d seconds have passed since batch creation plus linger time" % (since_ready,)
elif self.in_retry() and timeout < since_backoff:
error = "%d seconds have passed since last attempt plus backoff time" % since_backoff
error = "%d seconds have passed since last attempt plus backoff time" % (since_backoff,)

if error:
self.records.close()
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ def send_messages(self, topic, *msg):
)

def __repr__(self):
return '<SimpleProducer batch=%s>' % self.async_send
return '<SimpleProducer batch=%s>' % (self.async_send,)
2 changes: 1 addition & 1 deletion kafka/protocol/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,4 @@ def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None)
elif codec == CODEC_SNAPPY:
return [create_snappy_message(messages, key)]
else:
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))
4 changes: 2 additions & 2 deletions kafka/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _encode_self(self, recalc_crc=True):
elif version == 0:
fields = (self.crc, self.magic, self.attributes, self.key, self.value)
else:
raise ValueError('Unrecognized message version: %s' % version)
raise ValueError('Unrecognized message version: %s' % (version,))
message = Message.SCHEMAS[version].encode(fields)
if not recalc_crc:
return message
Expand Down Expand Up @@ -143,7 +143,7 @@ def __hash__(self):

class PartialMessage(bytes):
def __repr__(self):
return 'PartialMessage(%s)' % self
return 'PartialMessage(%s)' % (self,)


class MessageSet(AbstractType):
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _process_response(self, read_buffer):
raise Errors.CorrelationIdError(
'No in-flight-request found for server response'
' with correlation ID %d'
% recv_correlation_id)
% (recv_correlation_id,))

(correlation_id, request) = self.in_flight_requests.popleft()

Expand Down
2 changes: 1 addition & 1 deletion kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def __iter__(self):
# There should only ever be a single layer of compression
assert not attrs & self.CODEC_MASK, (
'MessageSet at offset %d appears double-compressed. This '
'should not happen -- check your producers!' % offset)
'should not happen -- check your producers!' % (offset,))

# When magic value is greater than 0, the timestamp
# of a compressed message depends on the
Expand Down
8 changes: 4 additions & 4 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ def kafka_run_class_args(cls, *args):
def kafka_run_class_env(self):
env = os.environ.copy()
env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \
self.test_resource("log4j.properties")
(self.test_resource("log4j.properties"),)
return env

@classmethod
def render_template(cls, source_file, target_file, binding):
log.info('Rendering %s from template %s', target_file.strpath, source_file)
with open(source_file, "r") as handle:
template = handle.read()
assert len(template) > 0, 'Empty template %s' % source_file
assert len(template) > 0, 'Empty template %s' % (source_file,)
with open(target_file.strpath, "w") as handle:
handle.write(template.format(**binding))
handle.flush()
Expand Down Expand Up @@ -257,7 +257,7 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
# TODO: checking for port connection would be better than scanning logs
# until then, we need the pattern to work across all supported broker versions
# The logging format changed slightly in 1.0.0
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % broker_id
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,)

self.zookeeper = zookeeper
self.zk_chroot = zk_chroot
Expand Down Expand Up @@ -291,7 +291,7 @@ def _create_zk_chroot(self):
"%s:%d" % (self.zookeeper.host,
self.zookeeper.port),
"create",
"/%s" % self.zk_chroot,
"/%s" % (self.zk_chroot,),
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
Expand Down
2 changes: 1 addition & 1 deletion test/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ def test_reporter(metrics):

for key in list(expected.keys()):
metrics = expected.pop(key)
expected['foo.%s' % key] = metrics
expected['foo.%s' % (key,)] = metrics
assert expected == foo_reporter.snapshot()


Expand Down
2 changes: 1 addition & 1 deletion test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_end_to_end(kafka_broker, compression):
except StopIteration:
break

assert msgs == set(['msg %d' % i for i in range(messages)])
assert msgs == set(['msg %d' % (i,) for i in range(messages)])
consumer.close()


Expand Down
2 changes: 1 addition & 1 deletion test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def construct_lambda(s):
op_str = s[0:2] # >= <=
v_str = s[2:]
else:
raise ValueError('Unrecognized kafka version / operator: %s' % s)
raise ValueError('Unrecognized kafka version / operator: %s' % (s,))

op_map = {
'=': operator.eq,
Expand Down