Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Fully-featured server stop functionality #138

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion baseHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def run(self):
input = self.queue_in.get()
if isinstance(input, bytes) and input == b"END":
# sentinelle signal to avoid queue deadlock
logger.debug("Stopping thread")
logger.debug(f"{self.__class__.__name__}: Received END message, stopping thread")
break
start_time = perf_counter()
for output in self.process(input):
Expand All @@ -44,6 +44,10 @@ def run(self):
self.cleanup()
self.queue_out.put(b"END")

def stop(self):
logger.debug(f"{self.__class__.__name__}: Stopping pipeline component..")
self.stop_event.set()

@property
def last_time(self):
return self._times[-1]
Expand Down
7 changes: 6 additions & 1 deletion connections/socket_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def run(self):
self.socket.listen(1)
logger.info("Receiver waiting to be connected...")
self.conn, _ = self.socket.accept()
logger.info("receiver connected")
logger.info("Receiver connected")

self.should_listen.set()
while not self.stop_event.is_set():
Expand All @@ -58,3 +58,8 @@ def run(self):
self.queue_out.put(audio_chunk)
self.conn.close()
logger.info("Receiver closed")

def stop(self):
logger.debug("Receiver is in stopping process. Sending END message to the next component to initiate server termination..")
self.queue_out.put(b"END")
self.stop_event.set()
35 changes: 25 additions & 10 deletions connections/socket_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def __init__(self, stop_event, queue_in, host="0.0.0.0", port=12346):
self.stop_event = stop_event
self.queue_in = queue_in
self.host = host
self.socket = None
self.conn = None
self.port = port

def run(self):
Expand All @@ -24,13 +26,26 @@ def run(self):
self.socket.bind((self.host, self.port))
self.socket.listen(1)
logger.info("Sender waiting to be connected...")
self.conn, _ = self.socket.accept()
logger.info("sender connected")

while not self.stop_event.is_set():
audio_chunk = self.queue_in.get()
self.conn.sendall(audio_chunk)
if isinstance(audio_chunk, bytes) and audio_chunk == b"END":
break
self.conn.close()
logger.info("Sender closed")

try:
self.conn, _ = self.socket.accept()
logger.info("Sender connected")

while not self.stop_event.is_set():
audio_chunk = self.queue_in.get()
self.conn.sendall(audio_chunk)
if isinstance(audio_chunk, bytes) and audio_chunk == b"END":
break
except OSError as e:
# Handle exception due to socket shutdown
logger.debug(f"SocketSender received exception: {e}. Possibly the sever is in termination process..")

finally:
if self.conn is not None:
self.conn.close()
logger.info("Sender closed")

def stop(self):
self.stop_event.set()
logger.debug("SocketSender: shutdown socket")
self.socket.shutdown(socket.SHUT_RDWR) # Shutdown the socket to overcome blocking socket calls (e.g. accept())
6 changes: 5 additions & 1 deletion s2s_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,13 @@ def main():

try:
pipeline_manager.start()
pipeline_manager.join_all()
except KeyboardInterrupt:
pipeline_manager.stop()
print("Stopping server..")

finally:
pipeline_manager.stop()
print("Server closed.")

if __name__ == "__main__":
main()
17 changes: 14 additions & 3 deletions utils/thread_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import threading
import logging

logger = logging.getLogger(__name__)

class ThreadManager:
"""
Expand All @@ -15,9 +17,18 @@ def start(self):
thread = threading.Thread(target=handler.run)
self.threads.append(thread)
thread.start()
logger.debug(f'Thread {thread.ident} has started. Target: {thread.name}, type: {type(handler)}')

def join_all(self):
for thread in self.threads:
logger.debug(f'Thread {thread.ident} attempt to join. Target: {thread.name}')
# Allow the main thread to remain responsive to KeyboardInterrupt while waiting for a child thread to finish
while thread.is_alive():
thread.join(timeout=0.5)
logger.debug(f'Thread {thread.ident} has joined. Target: {thread.name}')

def stop(self):
logger.debug("Server stop was invoked")
for handler in self.handlers:
handler.stop_event.set()
for thread in self.threads:
thread.join()
handler.stop()
self.join_all()