diff --git a/aiokafka/cluster.py b/aiokafka/cluster.py index 23e688bb..ccee5636 100644 --- a/aiokafka/cluster.py +++ b/aiokafka/cluster.py @@ -209,7 +209,7 @@ def failed_update(self, exception): f = self._future self._future = None if f: - f.failure(exception) + f.set_exception(exception) self._last_refresh_ms = time.time() * 1000 def update_metadata(self, metadata): @@ -307,7 +307,7 @@ def update_metadata(self, metadata): self._last_successful_refresh_ms = now if f: - f.success(self) + f.set_result(self) log.debug("Updated cluster metadata to %s", self) for listener in self._listeners: diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 3c10f316..8af2583b 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -19,3 +19,23 @@ def test_empty_broker_list(): ) ) assert len(cluster.brokers()) == 2 + + +def test_request_update_expecting_success(): + cluster = ClusterMetadata() + updated_cluster = cluster.request_update() + cluster.update_metadata( + MetadataResponse[0]([(0, "foo", 12), (1, "bar", 34)], []), + ) + assert updated_cluster.result() == cluster + + +def test_request_update_expecting_failure(): + cluster = ClusterMetadata() + updated_cluster = cluster.request_update() + test_metadata = MetadataResponse[0]( + [], # empty brokers + [(17, "foo", []), (17, "bar", [])], # topics w/ error + ) + cluster.update_metadata(test_metadata) + assert updated_cluster.exception() is not None