From c6d206d9f69546f9eb76ca99e5f510dfb8f5520c Mon Sep 17 00:00:00 2001 From: Seth Grover Date: Thu, 6 Apr 2023 09:52:06 -0600 Subject: [PATCH] work in progress for idaholab/Malcolm#168 --- shared/bin/watch_common.py | 188 +++++++++++++++++++++++-------------- 1 file changed, 118 insertions(+), 70 deletions(-) diff --git a/shared/bin/watch_common.py b/shared/bin/watch_common.py index 9ff1af62a..01570fe67 100644 --- a/shared/bin/watch_common.py +++ b/shared/bin/watch_common.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import os +import json import time from malcolm_utils import AtomicInt, ContextLockedOrderedDict, same_file_or_dir @@ -26,9 +27,12 @@ from watchdog.utils import WatchdogShutdown from watchdog.observers import Observer from watchdog.observers.polling import PollingObserver +from collections import namedtuple, defaultdict ASSUME_CLOSED_SEC_DEFAULT = 10 +OperationEvent = namedtuple("OperationEvent", ["timestamp", "operation"], rename=False) + ################################################################################################### class FileOperationEventHandler(FileSystemEventHandler): @@ -41,11 +45,14 @@ def __init__( ): super().__init__(*args, **kwargs) self.polling = polling - # items at the first (idx=0) of this OrderedDict are the - # oldest, items at the last (idx=len-1) are the newest - self.deck = ContextLockedOrderedDict() self.logger = logger self.updateTime() + # self.deck is a dictionary mapping filenames to a list of OperationEvent of length n, + # with [0] being the oldest timestamp/operation and [n-1] being the newest + # timestamp/operation. + # In self.dec itself, items at the first (idx=0) of this OrderedDict are the + # oldest, items at the last (idx=len-1) are the newest. + self.deck = ContextLockedOrderedDict() def done(self): return True @@ -54,48 +61,79 @@ def updateTime(self): self.nowTime = int(time.time()) def on_any_event(self, event): - if not event.is_directory: - self.updateTime() - if self.logger: - self.logger.info(f"{self.nowTime}: {event.event_type} {event.src_path}") - - def on_created(self, event): - self.on_modified(event) - - def on_modified(self, event): - if not event.is_directory: - with self.deck as d: - d[event.src_path] = self.nowTime - d.move_to_end(event.src_path, last=True) - - def on_moved(self, event): - if not event.is_directory: - if isinstance(event, FileSystemMovedEvent) and same_file_or_dir( - os.path.dirname(event.src_path), os.path.dirname(event.dest_path) - ): - # a file was simply renamed in the watched directory (not moved - # from some other directory) so just update the filename + fName = None + try: + if not event.is_directory: + self.updateTime() + + # FileClosedEvent is only going to come from inotify events, not polling + # so we know we're good to go (a FileClosedEvent signals we can process the + # file immediately). We can signal this by setting the timestamp to 0. + newOpLog = OperationEvent( + self.nowTime if (not isinstance(event, FileClosedEvent)) else 0, event.event_type + ) + + # if this is a move event, we need to track the old and new filenames + if isinstance(event, FileSystemMovedEvent): + fName = event.dest_path + fNameOld = event.src_path + if self.logger: + self.logger.info(f"â†Ķ\t{event.event_type: >10}\t{event.src_path} {event.dest_path}") + else: + fName = event.src_path + fNameOld = None + if self.logger: + self.logger.info(f"ðŸ—ē\t{event.event_type: <10}\t{event.src_path}") + with self.deck as d: - d.pop(event.src_path, self.nowTime) - d[event.dest_path] = self.nowTime - d.move_to_end(event.dest_path, last=True) - else: - # the file was moved from somewhere else, treat it as a create - self.on_created(event) - - def on_closed(self, event): - # on_closed is only going to come from inotify events, not polling - # so we know we're good to go. set its time to expire immediately in the worker - if not event.is_directory: - with self.deck as d: - d[event.src_path] = 0 - d.move_to_end(event.src_path, last=False) - - def on_deleted(self, event): - # if a file is deleted I guess we don't need to track it any more - if not event.is_directory: - with self.deck as d: - d.pop(event.src_path, self.nowTime) + if fNameOld: + if same_file_or_dir(os.path.dirname(fNameOld), os.path.dirname(fName)): + # a file was simply renamed in the watched directory (not moved + # from some other directory) so just update the filename + d.pop(fNameOld, None) + else: + # the file was moved from somewhere else, treat it like a create + pass + + # insert or update file event(s) + + if fName in d: + # this is a file we're already currently tracking + + # if the previous operation (the last one in the history) was the same as this one, + # replace the operation rather than appending a new one (effectively just updating the timestamp) + if (len(d[fName]) > 0) and (d[fName][-1].operation == event.event_type): + d[fName][-1] = newOpLog + + else: + # otherwise append a new history item + d[fName].append(newOpLog) + + else: + # this is a file we were not previously tracking + d[fName] = [newOpLog] + + if ( + isinstance(event, FileModifiedEvent) + or isinstance(event, FileClosedEvent) + or isinstance(event, FileCreatedEvent) + or isinstance(event, FileSystemMovedEvent) + ): + # put FileClosedEvent events (which now have a timestamp of 0) at the front of + # the deck (to be processed first), and others to the back + d.move_to_end(fName, last=d[fName][-1] > 0) + + elif isinstance(event, FileDeletedEvent): + # if a file is deleted I guess we don't need to track it any more + d.pop(fName, None) + fName = None + + if self.logger and fName: + self.logger.debug(f"⎗\t{fName}\t{json.dumps(d[fName])}") + + except Exception as e: + if self.logger: + self.logger.error(f"âĻģ\t{fName}\t{e}") ################################################################################################### @@ -112,37 +150,47 @@ def ProcessFileEventWorker(workerArgs): ) with workerThreadCount as workerId: - if logger is not None: - logger.info(f"[{workerId}]:started") + if logger: + logger.info(f"۞\tstarted\t[{workerId}]") while (not shutDown[0]) and observer.is_alive(): - time.sleep(1) + time.sleep(0.5) nowTime = int(time.time()) + with handler.deck as d: - for fileName, eventTime in list(d.items()): - if nowTime < eventTime + assumeClosedSec: - # we can break because the list is ordered - break - else: - del d[fileName] - if fileProcessor is not None: - extraArgs = ( - fileProcessorKwargs - if fileProcessorKwargs and isinstance(fileProcessorKwargs, dict) - else {} - ) - fileProcessor( - fileName, - **extraArgs, - ) - if logger is not None: - logger.info( - f"processed {fileName} at {(nowTime-eventTime) if (eventTime > 0) else 0} seconds" - ) + for fileName, fileHistory in list(d.items()): + if logger: + logger.debug(f"âŋ checking {fileName}\t{json.dumps(fileHistory)}\t[{workerId}]") + + if len(fileHistory) > 0: + if nowTime < fileHistory[-1].timestamp + assumeClosedSec: + # we can break because the list is ordered + if logger: + logger.debug( + f"⎊\tbreaking early because {nowTime} < {fileHistory[-1].timestamp + assumeClosedSec}\t[{workerId}]" + ) + break + + else: + del d[fileName] + if fileProcessor is not None: + extraArgs = ( + fileProcessorKwargs + if fileProcessorKwargs and isinstance(fileProcessorKwargs, dict) + else {} + ) + fileProcessor( + fileName, + **extraArgs, + ) + if logger: + logger.info( + f"🖄\tprocessed\t{fileName} at {(nowTime-fileHistory[-1].timestamp) if (fileHistory[-1].timestamp > 0) else 0} seconds\t[{workerId}]" + ) time.sleep(1) - if logger is not None: - logger.info(f"[{workerId}]: finished") + if logger: + logger.info(f"⛒\tfinished\t[{workerId}]") def WatchAndProcessDirectory( @@ -161,7 +209,7 @@ def WatchAndProcessDirectory( ) for directory in directories: if logger: - logger.info(f"Scheduling {directory}") + logger.info(f"🗐\tScheduling {directory}") observer.schedule(handler, directory, recursive=True) observer.start()