Skip to content

Commit

Permalink
fix(confluent-kafka): Fixed some lint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
javferrod committed Jun 27, 2023
1 parent 7a86555 commit 4c9cce6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def __init__(self, config):
# This method is deliberately implemented in order to allow wrapt to wrap this function
def poll(self, timeout=-1): # pylint: disable=useless-super-delegation
return super().poll(timeout)

# This method is deliberately implemented in order to allow wrapt to wrap this function
def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation
return super().consume(*args, **kwargs)
Expand Down Expand Up @@ -182,9 +182,7 @@ def committed(self, partitions, timeout=-1):
def commit(self, *args, **kwargs):
return self._consumer.commit(*args, **kwargs)

def consume(
self, *args, **kwargs
):
def consume(self, *args, **kwargs):
return ConfluentKafkaInstrumentor.wrap_consume(
self._consumer.consume, self, self._tracer, args, kwargs,
)
Expand Down Expand Up @@ -281,7 +279,7 @@ def _inner_wrap_poll(func, instance, args, kwargs):
return ConfluentKafkaInstrumentor.wrap_poll(
func, instance, self._tracer, args, kwargs
)

def _inner_wrap_consume(func, instance, args, kwargs):
return ConfluentKafkaInstrumentor.wrap_consume(
func, instance, self._tracer, args, kwargs
Expand All @@ -298,7 +296,7 @@ def _inner_wrap_consume(func, instance, args, kwargs):
"poll",
_inner_wrap_poll,
)

wrapt.wrap_function_wrapper(
AutoInstrumentedConsumer,
"consume",
Expand Down Expand Up @@ -373,7 +371,7 @@ def wrap_poll(func, instance, tracer, args, kwargs):
)

return record

@staticmethod
def wrap_consume(func, instance, tracer, args, kwargs):
if instance._current_consume_span:
Expand All @@ -399,7 +397,7 @@ def wrap_consume(func, instance, tracer, args, kwargs):
records[0].topic(),
operation=MessagingOperationValues.PROCESS,
)

instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ def __init__(self, queue, config):
self._queue = queue
super().__init__(config)

def consume(self, num_messages=1, *args, **kwargs):
def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
messages = self._queue[:num_messages]
self._queue = self._queue[num_messages:]
return messages

def poll(self, timeout=None):
if len(self._queue) > 0:
return self._queue.pop(0)
else:
return None
return None


class MockedMessage:
Expand Down

0 comments on commit 4c9cce6

Please sign in to comment.