Skip to content

Commit

Permalink
Fix bug in passing --gnu-style-flags
Browse files Browse the repository at this point in the history
Set a maximum timeout of 5 minutes
Added debug messages to RMQ service manager and extend timeout for event processing
  • Loading branch information
langmm committed Jul 8, 2024
1 parent bbb83c8 commit 59e5c52
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 2 deletions.
10 changes: 10 additions & 0 deletions yggdrasil/command_line.py
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,11 @@ class cc_flags(cc_toolname):
(('--toolname', ),
{'default': None,
'help': 'Name of the tool that associated flags be returned for.'}),
(('--gnu-style-flags', ),
{'action': 'store_true',
'help': ('Convert flags to use dashes (-) instead of '
'forward slashes (/) (used only with MSVC '
'compilation tools.')}),
] + DependencySpecialization.command_line_options

@classmethod
Expand All @@ -1332,6 +1337,11 @@ class ld_flags(cc_toolname):
(('--toolname', ),
{'default': None,
'help': 'Name of the tool that associated flags be returned for.'}),
(('--gnu-style-flags', ),
{'action': 'store_true',
'help': ('Convert flags to use dashes (-) instead of '
'forward slashes (/) (used only with MSVC '
'compilation tools.')}),
] + DependencySpecialization.command_line_options

@classmethod
Expand Down
4 changes: 4 additions & 0 deletions yggdrasil/multitasking.py
Original file line number Diff line number Diff line change
Expand Up @@ -1161,9 +1161,13 @@ def task_target():
loop = TaskLoop(target=task_target,
polling_interval=self.polling_interval)
loop.start()
if timeout is None:
timeout = 60.0 * 5.0 # 5 minutes
loop.join(timeout)
if loop.is_alive():
loop.kill()
logger.info(f"{self.logname}: Task taking longer than "
f"{timeout} s")
if on_timeout is True:
return self.function()
elif (on_timeout is False):
Expand Down
23 changes: 21 additions & 2 deletions yggdrasil/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ def is_installed(cls):

def _init_rmq(self, *args, **kwargs):
from yggdrasil.communication.RMQComm import pika, get_rmq_parameters
self.info("_init_rmq begin")
self.pika = pika
if not self.address:
kwargs['port'] = self.port
Expand All @@ -483,6 +484,7 @@ def _init_rmq(self, *args, **kwargs):
parameters = pika.URLParameters(self.address)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.info("_init_rmq end")

def setup_server(self, *args, **kwargs):
r"""Set up the machinery for receiving requests.
Expand All @@ -495,6 +497,7 @@ def setup_server(self, *args, **kwargs):
"""
self._init_rmq(*args, **kwargs)
self.info("setup_server begin")
if self.exchange: # pragma: debug
# self.channel.exchange_declare(exchange=self.exchange,
# auto_delete=True)
Expand All @@ -507,6 +510,7 @@ def setup_server(self, *args, **kwargs):
on_message_callback=self._on_request)
cb = functools.partial(self.shutdown, in_callback=True)
self.channel.add_on_cancel_callback(cb)
self.info("setup_server end")

def setup_client(self, *args, **kwargs):
r"""Set up the machinery for sending requests.
Expand All @@ -519,23 +523,29 @@ def setup_client(self, *args, **kwargs):
"""
self._init_rmq(*args, **kwargs)
self.info("setup_client begin")
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.consumer_tag = self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self._on_response,
auto_ack=True)
self.info("setup_client end")

def run_server(self):
r"""Listen for requests."""
self.info("run_server begin")
try:
self.channel.start_consuming()
except self.pika.exceptions.ChannelWrongStateError: # pragma: debug
pass
self.info("run_server end")

def shutdown(self, in_callback=False):
r"""Shutdown the process from the server."""
self.info("shutdown begin")
if not self.channel: # pragma: debug
self.info("shutdown end - no channel")
return
if self.for_request:
queue = self.callback_queue
Expand All @@ -545,32 +555,38 @@ def shutdown(self, in_callback=False):
if not in_callback:
self.channel.basic_cancel(consumer_tag=self.consumer_tag)
if not self.for_request:
self.info("shutdown end - not for_request")
return
self.channel.queue_delete(queue=queue)
self.channel.close()
self.channel = None
self.connection.close()
self.connection = None
self.info("shutdown end")

def _on_request(self, ch, method, props, body):
self.info("_on_request begin")
response = self.process_request(body)
ch.basic_publish(exchange=self.exchange,
routing_key=props.reply_to,
properties=self.pika.BasicProperties(
correlation_id=props.correlation_id),
body=response)
ch.basic_ack(delivery_tag=method.delivery_tag)
self.info("_on_request end")

def _on_response(self, ch, method, props, body):
self.info("_on_response begin")
if self.corr_id == props.correlation_id:
self.response.set(body)
self.info("_on_response end")

@property
def is_running(self):
r"""bool: True if the server is running."""
return (super(RMQService, self).is_running and bool(self.channel))

def call(self, request, timeout=10.0, **kwargs):
def call(self, request, timeout=60.0, **kwargs):
r"""Send a request.
Args:
Expand All @@ -583,6 +599,7 @@ def call(self, request, timeout=10.0, **kwargs):
str: Serialized response.
"""
self.info("call begin")
self.response = ValueEvent()
self.corr_id = str(uuid.uuid4())
try:
Expand All @@ -607,7 +624,9 @@ def client_error():
wait_on_function(
process_events, timeout=timeout, polling_interval=0.5,
on_timeout=client_error)
return self.response.get()
out = self.response.get()
self.info("call end")
return out


def create_service_manager_class(service_type=None):
Expand Down

0 comments on commit 59e5c52

Please sign in to comment.