Skip to content

Commit

Permalink
Remove ability to set monitoring log directory separate from rundir (#…
Browse files Browse the repository at this point in the history
…3699)

Prior to this PR, MonitoringHub had a logdir parameter which let the log
directory be set separately from the DFK-level run directory.

Other Parsl components generally don't let the user set this unless
there is a specific reason.

So this PR removes that feature, reducing the amount of state to be
threaded around.

When reading this patch, note that what the DFK calls the rundir is a
different directory vs what a Config object calls the rundir.

# Changed Behaviour

This removes a parameter from user facing MonitoringHub configuration.

## Type of change

- Code maintenance/cleanup
  • Loading branch information
benclifford authored Nov 14, 2024
1 parent 3eb7e93 commit 9fb5269
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 27 deletions.
2 changes: 0 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ def __init__(self, config: Config) -> None:
self.monitoring = config.monitoring

if self.monitoring:
if self.monitoring.logdir is None:
self.monitoring.logdir = self.run_dir
self.monitoring.start(self.run_dir, self.config.run_dir)

self.time_began = datetime.datetime.now()
Expand Down
12 changes: 6 additions & 6 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,20 +279,20 @@ class Resource(Base):
class DatabaseManager:
def __init__(self,
db_url: str = 'sqlite:///runinfo/monitoring.db',
logdir: str = '.',
run_dir: str = '.',
logging_level: int = logging.INFO,
batching_interval: float = 1,
batching_threshold: float = 99999,
):

self.workflow_end = False
self.workflow_start_message: Optional[MonitoringMessage] = None
self.logdir = logdir
os.makedirs(self.logdir, exist_ok=True)
self.run_dir = run_dir
os.makedirs(self.run_dir, exist_ok=True)

logger.propagate = False

set_file_logger("{}/database_manager.log".format(self.logdir), level=logging_level,
set_file_logger(f"{self.run_dir}/database_manager.log", level=logging_level,
format_string="%(asctime)s.%(msecs)03d %(name)s:%(lineno)d [%(levelname)s] [%(threadName)s %(thread)d] %(message)s",
name="database_manager")

Expand Down Expand Up @@ -681,7 +681,7 @@ def close(self) -> None:
def dbm_starter(exception_q: mpq.Queue,
resource_msgs: mpq.Queue,
db_url: str,
logdir: str,
run_dir: str,
logging_level: int) -> None:
"""Start the database manager process
Expand All @@ -692,7 +692,7 @@ def dbm_starter(exception_q: mpq.Queue,

try:
dbm = DatabaseManager(db_url=db_url,
logdir=logdir,
run_dir=run_dir,
logging_level=logging_level)
logger.info("Starting dbm in dbm starter")
dbm.start(resource_msgs)
Expand Down
19 changes: 6 additions & 13 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def __init__(self,
workflow_name: Optional[str] = None,
workflow_version: Optional[str] = None,
logging_endpoint: Optional[str] = None,
logdir: Optional[str] = None,
monitoring_debug: bool = False,
resource_monitoring_enabled: bool = True,
resource_monitoring_interval: float = 30): # in seconds
Expand Down Expand Up @@ -73,8 +72,6 @@ def __init__(self,
The database connection url for monitoring to log the information.
These URLs follow RFC-1738, and can include username, password, hostname, database name.
Default: sqlite, in the configured run_dir.
logdir : str
Parsl log directory paths. Logs and temp files go here. Default: '.'
monitoring_debug : Bool
Enable monitoring debug logging. Default: False
resource_monitoring_enabled : boolean
Expand All @@ -96,7 +93,6 @@ def __init__(self,
self.hub_port_range = hub_port_range

self.logging_endpoint = logging_endpoint
self.logdir = logdir
self.monitoring_debug = monitoring_debug

self.workflow_name = workflow_name
Expand All @@ -109,13 +105,10 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No

logger.debug("Starting MonitoringHub")

if self.logdir is None:
self.logdir = "."

if self.logging_endpoint is None:
self.logging_endpoint = f"sqlite:///{os.fspath(config_run_dir)}/monitoring.db"

os.makedirs(self.logdir, exist_ok=True)
os.makedirs(dfk_run_dir, exist_ok=True)

self.monitoring_hub_active = True

Expand Down Expand Up @@ -151,7 +144,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
"hub_address": self.hub_address,
"udp_port": self.hub_port,
"zmq_port_range": self.hub_port_range,
"logdir": self.logdir,
"run_dir": dfk_run_dir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
},
name="Monitoring-Router-Process",
Expand All @@ -161,7 +154,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No

self.dbm_proc = ForkProcess(target=dbm_starter,
args=(self.exception_q, self.resource_msgs,),
kwargs={"logdir": self.logdir,
kwargs={"run_dir": dfk_run_dir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"db_url": self.logging_endpoint,
},
Expand All @@ -172,7 +165,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
logger.info("Started the router process %s and DBM process %s", self.router_proc.pid, self.dbm_proc.pid)

self.filesystem_proc = ForkProcess(target=filesystem_receiver,
args=(self.logdir, self.resource_msgs, dfk_run_dir),
args=(self.resource_msgs, dfk_run_dir),
name="Monitoring-Filesystem-Process",
daemon=True
)
Expand Down Expand Up @@ -258,8 +251,8 @@ def close(self) -> None:


@wrap_with_logs
def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir: str) -> None:
logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None:
logger = set_file_logger(f"{run_dir}/monitoring_filesystem_radio.log",
name="monitoring_filesystem_radio",
level=logging.INFO)

Expand Down
12 changes: 6 additions & 6 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self,
zmq_port_range: Tuple[int, int] = (55050, 56000),

monitoring_hub_address: str = "127.0.0.1",
logdir: str = ".",
run_dir: str = ".",
logging_level: int = logging.INFO,
atexit_timeout: int = 3, # in seconds
resource_msgs: mpq.Queue,
Expand All @@ -48,7 +48,7 @@ def __init__(self,
zmq_port_range : tuple(int, int)
The MonitoringHub picks ports at random from the range which will be used by Hub.
Default: (55050, 56000)
logdir : str
run_dir : str
Parsl log directory paths. Logs and temp files go here. Default: '.'
logging_level : int
Logging level as defined in the logging module. Default: logging.INFO
Expand All @@ -59,8 +59,8 @@ def __init__(self,
exit_event : Event
An event that the main Parsl process will set to signal that the monitoring router should shut down.
"""
os.makedirs(logdir, exist_ok=True)
self.logger = set_file_logger("{}/monitoring_router.log".format(logdir),
os.makedirs(run_dir, exist_ok=True)
self.logger = set_file_logger(f"{run_dir}/monitoring_router.log",
name="monitoring_router",
level=logging_level)
self.logger.debug("Monitoring router starting")
Expand Down Expand Up @@ -187,14 +187,14 @@ def router_starter(*,
udp_port: Optional[int],
zmq_port_range: Tuple[int, int],

logdir: str,
run_dir: str,
logging_level: int) -> None:
setproctitle("parsl: monitoring router")
try:
router = MonitoringRouter(hub_address=hub_address,
udp_port=udp_port,
zmq_port_range=zmq_port_range,
logdir=logdir,
run_dir=run_dir,
logging_level=logging_level,
resource_msgs=resource_msgs,
exit_event=exit_event)
Expand Down

0 comments on commit 9fb5269

Please sign in to comment.