diff --git a/examples/pubsub_eventing.py b/examples/pubsub_eventing.py index 8d440eaff6..16436eedc8 100644 --- a/examples/pubsub_eventing.py +++ b/examples/pubsub_eventing.py @@ -58,12 +58,12 @@ def on_event(payload: Dict[str, Any]) -> None: start_time = time.time() # Start eventing core with EventManager() as event_manager: - assert event_manager.event_queue + assert event_manager.queue # Create a subscriber. # Internally, subscribe will start a separate thread # to receive incoming published messages. - subscriber = EventSubscriber(event_manager.event_queue) + subscriber = EventSubscriber(event_manager.queue) subscriber.subscribe(on_event) # Start a publisher process to demonstrate safe exchange @@ -71,7 +71,7 @@ def on_event(payload: Dict[str, Any]) -> None: publisher_shutdown_event = multiprocessing.Event() publisher = multiprocessing.Process( target=publisher_process, args=( - publisher_shutdown_event, event_manager.event_queue, ), + publisher_shutdown_event, event_manager.queue, ), ) publisher.start() @@ -80,7 +80,7 @@ def on_event(payload: Dict[str, Any]) -> None: # between threads. try: while True: - event_manager.event_queue.publish( + event_manager.queue.publish( request_id=main_publisher_request_id, event_name=eventNames.WORK_STARTED, event_payload={'time': time.time()}, diff --git a/proxy/core/event/manager.py b/proxy/core/event/manager.py index b8110ebf68..6d831251a1 100644 --- a/proxy/core/event/manager.py +++ b/proxy/core/event/manager.py @@ -34,15 +34,16 @@ class EventManager: - """Event manager is an encapsulation around various initialization, dispatcher - start / stop API required for end-to-end eventing. + """Event manager is a context manager which provides + encapsulation around various setup and shutdown steps + to start the eventing core. """ def __init__(self) -> None: - self.event_queue: Optional[EventQueue] = None - self.event_dispatcher: Optional[EventDispatcher] = None - self.event_dispatcher_thread: Optional[threading.Thread] = None - self.event_dispatcher_shutdown: Optional[threading.Event] = None + self.queue: Optional[EventQueue] = None + self.dispatcher: Optional[EventDispatcher] = None + self.dispatcher_thread: Optional[threading.Thread] = None + self.dispatcher_shutdown: Optional[threading.Event] = None self.manager: Optional[multiprocessing.managers.SyncManager] = None def __enter__(self) -> 'EventManager': @@ -59,26 +60,26 @@ def __exit__( def setup(self) -> None: self.manager = multiprocessing.Manager() - self.event_queue = EventQueue(self.manager.Queue()) - self.event_dispatcher_shutdown = threading.Event() - assert self.event_dispatcher_shutdown - assert self.event_queue - self.event_dispatcher = EventDispatcher( - shutdown=self.event_dispatcher_shutdown, - event_queue=self.event_queue, + self.queue = EventQueue(self.manager.Queue()) + self.dispatcher_shutdown = threading.Event() + assert self.dispatcher_shutdown + assert self.queue + self.dispatcher = EventDispatcher( + shutdown=self.dispatcher_shutdown, + event_queue=self.queue, ) - self.event_dispatcher_thread = threading.Thread( - target=self.event_dispatcher.run, + self.dispatcher_thread = threading.Thread( + target=self.dispatcher.run, ) - self.event_dispatcher_thread.start() - logger.debug('Thread ID: %d', self.event_dispatcher_thread.ident) + self.dispatcher_thread.start() + logger.debug('Thread ID: %d', self.dispatcher_thread.ident) def shutdown(self) -> None: - assert self.event_dispatcher_shutdown - assert self.event_dispatcher_thread - self.event_dispatcher_shutdown.set() - self.event_dispatcher_thread.join() + assert self.dispatcher_shutdown + assert self.dispatcher_thread + self.dispatcher_shutdown.set() + self.dispatcher_thread.join() logger.debug( 'Shutdown of global event dispatcher thread %d successful', - self.event_dispatcher_thread.ident, + self.dispatcher_thread.ident, ) diff --git a/proxy/proxy.py b/proxy/proxy.py index 72b536310f..4c65178610 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -118,7 +118,7 @@ def __enter__(self) -> 'Proxy': self.pool = AcceptorPool( flags=self.flags, work_klass=self.work_klass, - event_queue=self.event_manager.event_queue if self.event_manager is not None else None, + event_queue=self.event_manager.queue if self.event_manager is not None else None, ) self.pool.setup() return self diff --git a/tests/test_main.py b/tests/test_main.py index e1bb21f478..20757914ab 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -132,7 +132,7 @@ def test_enable_events( mock_acceptor_pool.assert_called_with( flags=mock_initialize.return_value, work_klass=HttpProtocolHandler, - event_queue=mock_event_manager.return_value.event_queue, + event_queue=mock_event_manager.return_value.queue, ) mock_acceptor_pool.return_value.setup.assert_called() mock_acceptor_pool.return_value.shutdown.assert_called()