diff --git a/aiokafka/protocol/message.py b/aiokafka/protocol/message.py index 6f3eee4f..31993fe6 100644 --- a/aiokafka/protocol/message.py +++ b/aiokafka/protocol/message.py @@ -13,7 +13,6 @@ zstd_decode, ) from aiokafka.errors import UnsupportedCodecError -from aiokafka.util import WeakMethod from .struct import Struct from .types import AbstractType, Bytes, Int8, Int32, Int64, Schema, UInt32 @@ -63,7 +62,6 @@ def __init__(self, value, key=None, magic=0, attributes=0, crc=0, timestamp=None self.attributes = attributes self.key = key self.value = value - self.encode = WeakMethod(self._encode_self) @property def timestamp_type(self): @@ -79,7 +77,7 @@ def timestamp_type(self): else: return 0 - def _encode_self(self, recalc_crc=True): + def encode(self, recalc_crc=True): version = self.magic if version == 1: fields = ( @@ -129,7 +127,7 @@ def decode(cls, data): def validate_crc(self): if self._validated_crc is None: - raw_msg = self._encode_self(recalc_crc=False) + raw_msg = self.encode(recalc_crc=False) self._validated_crc = crc32(raw_msg[4:]) if self.crc == self._validated_crc: return True @@ -170,7 +168,7 @@ def decompress(self): return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes)) def __hash__(self): - return hash(self._encode_self(recalc_crc=False)) + return hash(self.encode(recalc_crc=False)) class PartialMessage(bytes): diff --git a/aiokafka/protocol/struct.py b/aiokafka/protocol/struct.py index a87c53d9..ee99c75a 100644 --- a/aiokafka/protocol/struct.py +++ b/aiokafka/protocol/struct.py @@ -1,7 +1,5 @@ from io import BytesIO -from aiokafka.util import WeakMethod - from .abstract import AbstractType from .types import Schema @@ -25,19 +23,7 @@ def __init__(self, *args, **kwargs): ) ) - # overloading encode() to support both class and instance - # Without WeakMethod() this creates circular ref, which - # causes instances to "leak" to garbage - self.encode = WeakMethod(self._encode_self) - - @classmethod - def encode(cls, item): - bits = [] - for i, field in enumerate(cls.SCHEMA.fields): - bits.append(field.encode(item[i])) - return b"".join(bits) - - def _encode_self(self): + def encode(self): return self.SCHEMA.encode([self.__dict__[name] for name in self.SCHEMA.names]) @classmethod diff --git a/aiokafka/util.py b/aiokafka/util.py index 1389bbd2..875d774b 100644 --- a/aiokafka/util.py +++ b/aiokafka/util.py @@ -2,9 +2,7 @@ import asyncio import os -import weakref from asyncio import AbstractEventLoop -from types import MethodType from typing import ( Any, Awaitable, @@ -104,38 +102,3 @@ def get_running_loop() -> asyncio.AbstractEventLoop: INTEGER_MAX_VALUE = 2**31 - 1 INTEGER_MIN_VALUE = -(2**31) - - -class WeakMethod: - """ - Callable that weakly references a method and the object it is bound to. It - is based on https://stackoverflow.com/a/24287465. - - Arguments: - - object_dot_method: A bound instance method (i.e. 'object.method'). - """ - - def __init__(self, object_dot_method: MethodType) -> None: - self.target = weakref.ref(object_dot_method.__self__) - self._target_id = id(self.target()) - self.method = weakref.ref(object_dot_method.__func__) - self._method_id = id(self.method()) - - def __call__(self, *args: Any, **kwargs: Any) -> Any: - """ - Calls the method on target with args and kwargs. - """ - method = self.method() - assert method is not None - return method(self.target(), *args, **kwargs) - - def __hash__(self) -> int: - return hash(self.target) ^ hash(self.method) - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, WeakMethod): - return False - return ( - self._target_id == other._target_id and self._method_id == other._method_id - )