Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ClusterMetadata request_update() #1056

Merged
merged 11 commits into from
Oct 21, 2024
4 changes: 2 additions & 2 deletions aiokafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def failed_update(self, exception):
f = self._future
self._future = None
if f:
f.failure(exception)
f.set_exception(exception)
self._last_refresh_ms = time.time() * 1000

def update_metadata(self, metadata):
Expand Down Expand Up @@ -307,7 +307,7 @@ def update_metadata(self, metadata):
self._last_successful_refresh_ms = now

if f:
f.success(self)
f.set_result(self)
log.debug("Updated cluster metadata to %s", self)

for listener in self._listeners:
Expand Down
20 changes: 20 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,23 @@ def test_empty_broker_list():
)
)
assert len(cluster.brokers()) == 2


def test_request_update_expecting_success():
cluster = ClusterMetadata()
updated_cluster = cluster.request_update()
cluster.update_metadata(
MetadataResponse[0]([(0, "foo", 12), (1, "bar", 34)], []),
)
assert updated_cluster.result() == cluster


def test_request_update_expecting_failure():
cluster = ClusterMetadata()
updated_cluster = cluster.request_update()
test_metadata = MetadataResponse[0](
[], # empty brokers
[(17, "foo", []), (17, "bar", [])], # topics w/ error
)
cluster.update_metadata(test_metadata)
assert updated_cluster.exception() is not None
Loading