Skip to content

Commit

Permalink
Rename EventManager.event_queue to EventManager.queue
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavsingh committed Nov 8, 2021
1 parent 84b82a1 commit 8aeb4e6
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 28 deletions.
8 changes: 4 additions & 4 deletions examples/pubsub_eventing.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,20 @@ def on_event(payload: Dict[str, Any]) -> None:
start_time = time.time()
# Start eventing core
with EventManager() as event_manager:
assert event_manager.event_queue
assert event_manager.queue

# Create a subscriber.
# Internally, subscribe will start a separate thread
# to receive incoming published messages.
subscriber = EventSubscriber(event_manager.event_queue)
subscriber = EventSubscriber(event_manager.queue)
subscriber.subscribe(on_event)

# Start a publisher process to demonstrate safe exchange
# of messages between processes.
publisher_shutdown_event = multiprocessing.Event()
publisher = multiprocessing.Process(
target=publisher_process, args=(
publisher_shutdown_event, event_manager.event_queue, ),
publisher_shutdown_event, event_manager.queue, ),
)
publisher.start()

Expand All @@ -80,7 +80,7 @@ def on_event(payload: Dict[str, Any]) -> None:
# between threads.
try:
while True:
event_manager.event_queue.publish(
event_manager.queue.publish(
request_id=main_publisher_request_id,
event_name=eventNames.WORK_STARTED,
event_payload={'time': time.time()},
Expand Down
45 changes: 23 additions & 22 deletions proxy/core/event/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@


class EventManager:
"""Event manager is an encapsulation around various initialization, dispatcher
start / stop API required for end-to-end eventing.
"""Event manager is a context manager which provides
encapsulation around various setup and shutdown steps
to start the eventing core.
"""

def __init__(self) -> None:
self.event_queue: Optional[EventQueue] = None
self.event_dispatcher: Optional[EventDispatcher] = None
self.event_dispatcher_thread: Optional[threading.Thread] = None
self.event_dispatcher_shutdown: Optional[threading.Event] = None
self.queue: Optional[EventQueue] = None
self.dispatcher: Optional[EventDispatcher] = None
self.dispatcher_thread: Optional[threading.Thread] = None
self.dispatcher_shutdown: Optional[threading.Event] = None
self.manager: Optional[multiprocessing.managers.SyncManager] = None

def __enter__(self) -> 'EventManager':
Expand All @@ -59,26 +60,26 @@ def __exit__(

def setup(self) -> None:
self.manager = multiprocessing.Manager()
self.event_queue = EventQueue(self.manager.Queue())
self.event_dispatcher_shutdown = threading.Event()
assert self.event_dispatcher_shutdown
assert self.event_queue
self.event_dispatcher = EventDispatcher(
shutdown=self.event_dispatcher_shutdown,
event_queue=self.event_queue,
self.queue = EventQueue(self.manager.Queue())
self.dispatcher_shutdown = threading.Event()
assert self.dispatcher_shutdown
assert self.queue
self.dispatcher = EventDispatcher(
shutdown=self.dispatcher_shutdown,
event_queue=self.queue,
)
self.event_dispatcher_thread = threading.Thread(
target=self.event_dispatcher.run,
self.dispatcher_thread = threading.Thread(
target=self.dispatcher.run,
)
self.event_dispatcher_thread.start()
logger.debug('Thread ID: %d', self.event_dispatcher_thread.ident)
self.dispatcher_thread.start()
logger.debug('Thread ID: %d', self.dispatcher_thread.ident)

def shutdown(self) -> None:
assert self.event_dispatcher_shutdown
assert self.event_dispatcher_thread
self.event_dispatcher_shutdown.set()
self.event_dispatcher_thread.join()
assert self.dispatcher_shutdown
assert self.dispatcher_thread
self.dispatcher_shutdown.set()
self.dispatcher_thread.join()
logger.debug(
'Shutdown of global event dispatcher thread %d successful',
self.event_dispatcher_thread.ident,
self.dispatcher_thread.ident,
)
2 changes: 1 addition & 1 deletion proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def __enter__(self) -> 'Proxy':
self.pool = AcceptorPool(
flags=self.flags,
work_klass=self.work_klass,
event_queue=self.event_manager.event_queue if self.event_manager is not None else None,
event_queue=self.event_manager.queue if self.event_manager is not None else None,
)
self.pool.setup()
return self
Expand Down
2 changes: 1 addition & 1 deletion tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_enable_events(
mock_acceptor_pool.assert_called_with(
flags=mock_initialize.return_value,
work_klass=HttpProtocolHandler,
event_queue=mock_event_manager.return_value.event_queue,
event_queue=mock_event_manager.return_value.queue,
)
mock_acceptor_pool.return_value.setup.assert_called()
mock_acceptor_pool.return_value.shutdown.assert_called()
Expand Down

0 comments on commit 8aeb4e6

Please sign in to comment.