Skip to content

Commit

Permalink
Topic: Remove topic with active_partitions from Conductor after (#531)
Browse files Browse the repository at this point in the history
Co-authored-by: o.stetsenko <[email protected]>
  • Loading branch information
ostetsenko and o.stetsenko authored Jun 30, 2023
1 parent 7eea54a commit 236516d
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 7 deletions.
10 changes: 10 additions & 0 deletions faust/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,16 @@ async def declare(self) -> None:
retention=self.retention,
)

def on_stop_iteration(self) -> None:
"""Signal that iteration over this channel was stopped.
Tip:
Remember to call ``super`` when overriding this method.
"""
super().on_stop_iteration()
if self.active_partitions is not None:
# Remove topics for isolated partitions from the Conductor.
self.app.topics.discard(cast(TopicT, self))

def __aiter__(self) -> ChannelT:
if self.is_iterator:
return self
Expand Down
15 changes: 10 additions & 5 deletions tests/functional/agents/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ def __init__(
self.isolated_partitions = isolated_partitions

self.topic_name = self.name
self.tps = [TP(self.topic_name, p) for p in self.partitions]
self.next_tp = cycle(self.tps)
self.expected_tp = cycle(self.tps)
self._set_tps_from_partitions()
self.seen_offsets = set()
self.processed_total = 0

Expand All @@ -72,6 +70,11 @@ def __init__(
self.agent_stopped_processing = asyncio.Event()
self.finished = asyncio.Event()

def _set_tps_from_partitions(self):
self.tps = [TP(self.topic_name, p) for p in self.partitions]
self.next_tp = cycle(self.tps)
self.expected_tp = cycle(self.tps)

async def on_start(self) -> None:
app = self.app
topic = app.topic(self.topic_name, value_serializer=self.value_serializer)
Expand Down Expand Up @@ -153,8 +156,10 @@ async def _send(self) -> None:

self.finished.set()

async def conductor_setup(self, assigned: Set[TP]) -> None:
await self.app.agents.on_rebalance(set(), assigned)
async def conductor_setup(
self, assigned: Set[TP], revoked: Optional[Set[TP]] = None
) -> None:
await self.app.agents.on_rebalance(revoked or set(), assigned)
await self.app.topics._update_indices()
await self.app.topics.on_partitions_assigned(assigned)

Expand Down
59 changes: 57 additions & 2 deletions tests/functional/agents/test_isolated_partitions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from collections import Counter
from pprint import pformat
from typing import List, NamedTuple
from typing import Any, List, Mapping, NamedTuple

import pytest

from faust.exceptions import ImproperlyConfigured
from faust.types import EventT, StreamT
from faust.types import AppT, EventT, Message as MessageT, StreamT

from .helpers import AgentCase

Expand Down Expand Up @@ -51,6 +51,25 @@ async def test_agent_isolated_partitions__concurrency(*, app, logging):
)


@pytest.mark.asyncio
async def test_agent_isolated_partitions_rebalancing(*, app, logging):
await AgentIsolatedRebalanceCase.run_test(
app=app,
num_messages=100,
concurrency=1,
partitions=[0, 1, 2, 3],
reassign_partitions={
10: [0],
20: [1],
30: [0, 1],
40: [2, 3],
50: [0, 1, 2, 3],
60: [4, 5, 6, 7],
},
isolated_partitions=True,
)


class AgentIsolatedCase(AgentCase):
name = "test_agent_isolated_partitions"

Expand Down Expand Up @@ -89,3 +108,39 @@ async def assert_success(self) -> None:
if max_ is None:
max_ = total
assert total == max_


class AgentIsolatedRebalanceCase(AgentCase):
name = "test_agent_isolated_partitions_rebalancing"

@classmethod
async def run_test(
cls, app: AppT, *, reassign_partitions: Mapping[int, List[int]], **kwargs: Any
) -> "AgentCase":
return await super().run_test(
app, reassign_partitions=reassign_partitions, **kwargs
)

def __init__(
self, app: AppT, *, reassign_partitions: Mapping[int, List[int]], **kwargs: Any
) -> None:
super().__init__(app, **kwargs)
self.reassign_partitions = reassign_partitions

async def put(self, key: bytes, value: bytes, **kwargs: Any) -> MessageT:
message = await super().put(key, value, **kwargs)

new_partitions = self.reassign_partitions.get(int(message.key))
if new_partitions is not None:
await self.simulate_rebalance(new_partitions)

return message

async def simulate_rebalance(self, partitions: List[int]):
await self.sleep(0.1)
self.partitions = sorted(partitions)
current_tps = set(self.tps)
self._set_tps_from_partitions()
assigned = set(self.tps)
revoked = current_tps - assigned
await self.conductor_setup(assigned=assigned, revoked=revoked)

0 comments on commit 236516d

Please sign in to comment.