Skip to content

Commit

Permalink
EventManager is also a context manager (#709)
Browse files Browse the repository at this point in the history
* `EventManager` is also a context manager

* unused

* Rename `EventManager.event_queue` to `EventManager.queue`
  • Loading branch information
abhinavsingh authored Nov 8, 2021
1 parent c6eaace commit aadcc10
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 79 deletions.
91 changes: 45 additions & 46 deletions examples/pubsub_eventing.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def publisher_process(
logger.info('publisher shutdown')


# Execute within a separate thread context
def on_event(payload: Dict[str, Any]) -> None:
'''Subscriber callback.'''
global num_events_received
Expand All @@ -55,49 +56,47 @@ def on_event(payload: Dict[str, Any]) -> None:

if __name__ == '__main__':
start_time = time.time()

# Start dispatcher thread using EventManager
event_manager = EventManager()
event_manager.start_event_dispatcher()
assert event_manager.event_queue

# Create a subscriber
subscriber = EventSubscriber(event_manager.event_queue)
# Internally, subscribe will start a separate thread
# to receive incoming published messages
subscriber.subscribe(on_event)

# Start a publisher process to demonstrate safe exchange
# of messages between processes.
publisher_shutdown_event = multiprocessing.Event()
publisher = multiprocessing.Process(
target=publisher_process, args=(
publisher_shutdown_event, event_manager.event_queue, ),
)
publisher.start()

try:
while True:
# Dispatch event from main process
event_manager.event_queue.publish(
request_id=main_publisher_request_id,
event_name=eventNames.WORK_STARTED,
event_payload={'time': time.time()},
publisher_id='eventing_pubsub_main',
)
except KeyboardInterrupt:
logger.info('bye!!!')
finally:
# Stop publisher
publisher_shutdown_event.set()
publisher.join()
# Stop subscriber thread
subscriber.unsubscribe()
# Signal dispatcher to shutdown
event_manager.stop_event_dispatcher()
logger.info(
'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
num_events_received[0], num_events_received[1], time.time(
) - start_time,
),
)
# Start eventing core
with EventManager() as event_manager:
assert event_manager.queue

# Create a subscriber.
# Internally, subscribe will start a separate thread
# to receive incoming published messages.
subscriber = EventSubscriber(event_manager.queue)
subscriber.subscribe(on_event)

# Start a publisher process to demonstrate safe exchange
# of messages between processes.
publisher_shutdown_event = multiprocessing.Event()
publisher = multiprocessing.Process(
target=publisher_process, args=(
publisher_shutdown_event, event_manager.queue, ),
)
publisher.start()

# Dispatch event from main process too
# to demonstrate safe exchange of messages
# between threads.
try:
while True:
event_manager.queue.publish(
request_id=main_publisher_request_id,
event_name=eventNames.WORK_STARTED,
event_payload={'time': time.time()},
publisher_id='eventing_pubsub_main',
)
except KeyboardInterrupt:
logger.info('bye!!!')
finally:
# Stop publisher
publisher_shutdown_event.set()
publisher.join()
# Stop subscriber thread
subscriber.unsubscribe()
logger.info(
'Received {0} events from main thread, {1} events from another process, in {2} seconds'.format(
num_events_received[0], num_events_received[1], time.time(
) - start_time,
),
)
64 changes: 39 additions & 25 deletions proxy/core/event/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
import threading
import multiprocessing

from typing import Optional
from typing import Optional, Type
from types import TracebackType

from .queue import EventQueue
from .dispatcher import EventDispatcher
Expand All @@ -33,39 +34,52 @@


class EventManager:
"""Event manager is an encapsulation around various initialization, dispatcher
start / stop API required for end-to-end eventing.
"""Event manager is a context manager which provides
encapsulation around various setup and shutdown steps
to start the eventing core.
"""

def __init__(self) -> None:
self.event_queue: Optional[EventQueue] = None
self.event_dispatcher: Optional[EventDispatcher] = None
self.event_dispatcher_thread: Optional[threading.Thread] = None
self.event_dispatcher_shutdown: Optional[threading.Event] = None
self.queue: Optional[EventQueue] = None
self.dispatcher: Optional[EventDispatcher] = None
self.dispatcher_thread: Optional[threading.Thread] = None
self.dispatcher_shutdown: Optional[threading.Event] = None
self.manager: Optional[multiprocessing.managers.SyncManager] = None

def start_event_dispatcher(self) -> None:
def __enter__(self) -> 'EventManager':
self.setup()
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.shutdown()

def setup(self) -> None:
self.manager = multiprocessing.Manager()
self.event_queue = EventQueue(self.manager.Queue())
self.event_dispatcher_shutdown = threading.Event()
assert self.event_dispatcher_shutdown
assert self.event_queue
self.event_dispatcher = EventDispatcher(
shutdown=self.event_dispatcher_shutdown,
event_queue=self.event_queue,
self.queue = EventQueue(self.manager.Queue())
self.dispatcher_shutdown = threading.Event()
assert self.dispatcher_shutdown
assert self.queue
self.dispatcher = EventDispatcher(
shutdown=self.dispatcher_shutdown,
event_queue=self.queue,
)
self.event_dispatcher_thread = threading.Thread(
target=self.event_dispatcher.run,
self.dispatcher_thread = threading.Thread(
target=self.dispatcher.run,
)
self.event_dispatcher_thread.start()
logger.debug('Thread ID: %d', self.event_dispatcher_thread.ident)
self.dispatcher_thread.start()
logger.debug('Thread ID: %d', self.dispatcher_thread.ident)

def stop_event_dispatcher(self) -> None:
assert self.event_dispatcher_shutdown
assert self.event_dispatcher_thread
self.event_dispatcher_shutdown.set()
self.event_dispatcher_thread.join()
def shutdown(self) -> None:
assert self.dispatcher_shutdown
assert self.dispatcher_thread
self.dispatcher_shutdown.set()
self.dispatcher_thread.join()
logger.debug(
'Shutdown of global event dispatcher thread %d successful',
self.event_dispatcher_thread.ident,
self.dispatcher_thread.ident,
)
6 changes: 3 additions & 3 deletions proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ def __enter__(self) -> 'Proxy':
if self.flags.enable_events:
logger.info('Core Event enabled')
self.event_manager = EventManager()
self.event_manager.start_event_dispatcher()
self.event_manager.setup()
self.pool = AcceptorPool(
flags=self.flags,
work_klass=self.work_klass,
event_queue=self.event_manager.event_queue if self.event_manager is not None else None,
event_queue=self.event_manager.queue if self.event_manager is not None else None,
)
self.pool.setup()
return self
Expand All @@ -133,7 +133,7 @@ def __exit__(
self.pool.shutdown()
if self.flags.enable_events:
assert self.event_manager is not None
self.event_manager.stop_event_dispatcher()
self.event_manager.shutdown()


def main(
Expand Down
10 changes: 5 additions & 5 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ def test_enable_events(
mock_initialize.return_value.enable_events = True
main([])
mock_event_manager.assert_called_once()
mock_event_manager.return_value.start_event_dispatcher.assert_called_once()
mock_event_manager.return_value.stop_event_dispatcher.assert_called_once()
mock_event_manager.return_value.setup.assert_called_once()
mock_event_manager.return_value.shutdown.assert_called_once()
mock_acceptor_pool.assert_called_with(
flags=mock_initialize.return_value,
work_klass=HttpProtocolHandler,
event_queue=mock_event_manager.return_value.event_queue,
event_queue=mock_event_manager.return_value.queue,
)
mock_acceptor_pool.return_value.setup.assert_called()
mock_acceptor_pool.return_value.shutdown.assert_called()
Expand Down Expand Up @@ -171,8 +171,8 @@ def test_enable_dashboard(
mock_acceptor_pool.return_value.setup.assert_called()
# dashboard will also enable eventing
mock_event_manager.assert_called_once()
mock_event_manager.return_value.start_event_dispatcher.assert_called_once()
mock_event_manager.return_value.stop_event_dispatcher.assert_called_once()
mock_event_manager.return_value.setup.assert_called_once()
mock_event_manager.return_value.shutdown.assert_called_once()

@mock.patch('time.sleep')
@mock.patch('proxy.common.plugins.Plugins.load')
Expand Down

0 comments on commit aadcc10

Please sign in to comment.