From aadcc10813a859645d1a1d4dde97d90a0b5223b3 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Mon, 8 Nov 2021 23:02:36 +0530 Subject: [PATCH] `EventManager` is also a context manager (#709) * `EventManager` is also a context manager * unused * Rename `EventManager.event_queue` to `EventManager.queue` --- examples/pubsub_eventing.py | 91 ++++++++++++++++++------------------- proxy/core/event/manager.py | 64 ++++++++++++++++---------- proxy/proxy.py | 6 +-- tests/test_main.py | 10 ++-- 4 files changed, 92 insertions(+), 79 deletions(-) diff --git a/examples/pubsub_eventing.py b/examples/pubsub_eventing.py index 01dff1e535..16436eedc8 100644 --- a/examples/pubsub_eventing.py +++ b/examples/pubsub_eventing.py @@ -44,6 +44,7 @@ def publisher_process( logger.info('publisher shutdown') +# Execute within a separate thread context def on_event(payload: Dict[str, Any]) -> None: '''Subscriber callback.''' global num_events_received @@ -55,49 +56,47 @@ def on_event(payload: Dict[str, Any]) -> None: if __name__ == '__main__': start_time = time.time() - - # Start dispatcher thread using EventManager - event_manager = EventManager() - event_manager.start_event_dispatcher() - assert event_manager.event_queue - - # Create a subscriber - subscriber = EventSubscriber(event_manager.event_queue) - # Internally, subscribe will start a separate thread - # to receive incoming published messages - subscriber.subscribe(on_event) - - # Start a publisher process to demonstrate safe exchange - # of messages between processes. - publisher_shutdown_event = multiprocessing.Event() - publisher = multiprocessing.Process( - target=publisher_process, args=( - publisher_shutdown_event, event_manager.event_queue, ), - ) - publisher.start() - - try: - while True: - # Dispatch event from main process - event_manager.event_queue.publish( - request_id=main_publisher_request_id, - event_name=eventNames.WORK_STARTED, - event_payload={'time': time.time()}, - publisher_id='eventing_pubsub_main', - ) - except KeyboardInterrupt: - logger.info('bye!!!') - finally: - # Stop publisher - publisher_shutdown_event.set() - publisher.join() - # Stop subscriber thread - subscriber.unsubscribe() - # Signal dispatcher to shutdown - event_manager.stop_event_dispatcher() - logger.info( - 'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format( - num_events_received[0], num_events_received[1], time.time( - ) - start_time, - ), - ) + # Start eventing core + with EventManager() as event_manager: + assert event_manager.queue + + # Create a subscriber. + # Internally, subscribe will start a separate thread + # to receive incoming published messages. + subscriber = EventSubscriber(event_manager.queue) + subscriber.subscribe(on_event) + + # Start a publisher process to demonstrate safe exchange + # of messages between processes. + publisher_shutdown_event = multiprocessing.Event() + publisher = multiprocessing.Process( + target=publisher_process, args=( + publisher_shutdown_event, event_manager.queue, ), + ) + publisher.start() + + # Dispatch event from main process too + # to demonstrate safe exchange of messages + # between threads. + try: + while True: + event_manager.queue.publish( + request_id=main_publisher_request_id, + event_name=eventNames.WORK_STARTED, + event_payload={'time': time.time()}, + publisher_id='eventing_pubsub_main', + ) + except KeyboardInterrupt: + logger.info('bye!!!') + finally: + # Stop publisher + publisher_shutdown_event.set() + publisher.join() + # Stop subscriber thread + subscriber.unsubscribe() + logger.info( + 'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format( + num_events_received[0], num_events_received[1], time.time( + ) - start_time, + ), + ) diff --git a/proxy/core/event/manager.py b/proxy/core/event/manager.py index 0f22c52d15..6d831251a1 100644 --- a/proxy/core/event/manager.py +++ b/proxy/core/event/manager.py @@ -12,7 +12,8 @@ import threading import multiprocessing -from typing import Optional +from typing import Optional, Type +from types import TracebackType from .queue import EventQueue from .dispatcher import EventDispatcher @@ -33,39 +34,52 @@ class EventManager: - """Event manager is an encapsulation around various initialization, dispatcher - start / stop API required for end-to-end eventing. + """Event manager is a context manager which provides + encapsulation around various setup and shutdown steps + to start the eventing core. """ def __init__(self) -> None: - self.event_queue: Optional[EventQueue] = None - self.event_dispatcher: Optional[EventDispatcher] = None - self.event_dispatcher_thread: Optional[threading.Thread] = None - self.event_dispatcher_shutdown: Optional[threading.Event] = None + self.queue: Optional[EventQueue] = None + self.dispatcher: Optional[EventDispatcher] = None + self.dispatcher_thread: Optional[threading.Thread] = None + self.dispatcher_shutdown: Optional[threading.Event] = None self.manager: Optional[multiprocessing.managers.SyncManager] = None - def start_event_dispatcher(self) -> None: + def __enter__(self) -> 'EventManager': + self.setup() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.shutdown() + + def setup(self) -> None: self.manager = multiprocessing.Manager() - self.event_queue = EventQueue(self.manager.Queue()) - self.event_dispatcher_shutdown = threading.Event() - assert self.event_dispatcher_shutdown - assert self.event_queue - self.event_dispatcher = EventDispatcher( - shutdown=self.event_dispatcher_shutdown, - event_queue=self.event_queue, + self.queue = EventQueue(self.manager.Queue()) + self.dispatcher_shutdown = threading.Event() + assert self.dispatcher_shutdown + assert self.queue + self.dispatcher = EventDispatcher( + shutdown=self.dispatcher_shutdown, + event_queue=self.queue, ) - self.event_dispatcher_thread = threading.Thread( - target=self.event_dispatcher.run, + self.dispatcher_thread = threading.Thread( + target=self.dispatcher.run, ) - self.event_dispatcher_thread.start() - logger.debug('Thread ID: %d', self.event_dispatcher_thread.ident) + self.dispatcher_thread.start() + logger.debug('Thread ID: %d', self.dispatcher_thread.ident) - def stop_event_dispatcher(self) -> None: - assert self.event_dispatcher_shutdown - assert self.event_dispatcher_thread - self.event_dispatcher_shutdown.set() - self.event_dispatcher_thread.join() + def shutdown(self) -> None: + assert self.dispatcher_shutdown + assert self.dispatcher_thread + self.dispatcher_shutdown.set() + self.dispatcher_thread.join() logger.debug( 'Shutdown of global event dispatcher thread %d successful', - self.event_dispatcher_thread.ident, + self.dispatcher_thread.ident, ) diff --git a/proxy/proxy.py b/proxy/proxy.py index ced5c53c00..4c65178610 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -114,11 +114,11 @@ def __enter__(self) -> 'Proxy': if self.flags.enable_events: logger.info('Core Event enabled') self.event_manager = EventManager() - self.event_manager.start_event_dispatcher() + self.event_manager.setup() self.pool = AcceptorPool( flags=self.flags, work_klass=self.work_klass, - event_queue=self.event_manager.event_queue if self.event_manager is not None else None, + event_queue=self.event_manager.queue if self.event_manager is not None else None, ) self.pool.setup() return self @@ -133,7 +133,7 @@ def __exit__( self.pool.shutdown() if self.flags.enable_events: assert self.event_manager is not None - self.event_manager.stop_event_dispatcher() + self.event_manager.shutdown() def main( diff --git a/tests/test_main.py b/tests/test_main.py index 4948b86d5c..20757914ab 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -127,12 +127,12 @@ def test_enable_events( mock_initialize.return_value.enable_events = True main([]) mock_event_manager.assert_called_once() - mock_event_manager.return_value.start_event_dispatcher.assert_called_once() - mock_event_manager.return_value.stop_event_dispatcher.assert_called_once() + mock_event_manager.return_value.setup.assert_called_once() + mock_event_manager.return_value.shutdown.assert_called_once() mock_acceptor_pool.assert_called_with( flags=mock_initialize.return_value, work_klass=HttpProtocolHandler, - event_queue=mock_event_manager.return_value.event_queue, + event_queue=mock_event_manager.return_value.queue, ) mock_acceptor_pool.return_value.setup.assert_called() mock_acceptor_pool.return_value.shutdown.assert_called() @@ -171,8 +171,8 @@ def test_enable_dashboard( mock_acceptor_pool.return_value.setup.assert_called() # dashboard will also enable eventing mock_event_manager.assert_called_once() - mock_event_manager.return_value.start_event_dispatcher.assert_called_once() - mock_event_manager.return_value.stop_event_dispatcher.assert_called_once() + mock_event_manager.return_value.setup.assert_called_once() + mock_event_manager.return_value.shutdown.assert_called_once() @mock.patch('time.sleep') @mock.patch('proxy.common.plugins.Plugins.load')