Skip to content

Commit

Permalink
work in progress for #168
Browse files Browse the repository at this point in the history
  • Loading branch information
mmguero committed Apr 6, 2023
1 parent 393bc21 commit c6d206d
Showing 1 changed file with 118 additions and 70 deletions.
188 changes: 118 additions & 70 deletions shared/bin/watch_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-

import os
import json
import time

from malcolm_utils import AtomicInt, ContextLockedOrderedDict, same_file_or_dir
Expand All @@ -26,9 +27,12 @@
from watchdog.utils import WatchdogShutdown
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from collections import namedtuple, defaultdict

ASSUME_CLOSED_SEC_DEFAULT = 10

OperationEvent = namedtuple("OperationEvent", ["timestamp", "operation"], rename=False)


###################################################################################################
class FileOperationEventHandler(FileSystemEventHandler):
Expand All @@ -41,11 +45,14 @@ def __init__(
):
super().__init__(*args, **kwargs)
self.polling = polling
# items at the first (idx=0) of this OrderedDict are the
# oldest, items at the last (idx=len-1) are the newest
self.deck = ContextLockedOrderedDict()
self.logger = logger
self.updateTime()
# self.deck is a dictionary mapping filenames to a list of OperationEvent of length n,
# with [0] being the oldest timestamp/operation and [n-1] being the newest
# timestamp/operation.
# In self.dec itself, items at the first (idx=0) of this OrderedDict are the
# oldest, items at the last (idx=len-1) are the newest.
self.deck = ContextLockedOrderedDict()

def done(self):
return True
Expand All @@ -54,48 +61,79 @@ def updateTime(self):
self.nowTime = int(time.time())

def on_any_event(self, event):
if not event.is_directory:
self.updateTime()
if self.logger:
self.logger.info(f"{self.nowTime}: {event.event_type} {event.src_path}")

def on_created(self, event):
self.on_modified(event)

def on_modified(self, event):
if not event.is_directory:
with self.deck as d:
d[event.src_path] = self.nowTime
d.move_to_end(event.src_path, last=True)

def on_moved(self, event):
if not event.is_directory:
if isinstance(event, FileSystemMovedEvent) and same_file_or_dir(
os.path.dirname(event.src_path), os.path.dirname(event.dest_path)
):
# a file was simply renamed in the watched directory (not moved
# from some other directory) so just update the filename
fName = None
try:
if not event.is_directory:
self.updateTime()

# FileClosedEvent is only going to come from inotify events, not polling
# so we know we're good to go (a FileClosedEvent signals we can process the
# file immediately). We can signal this by setting the timestamp to 0.
newOpLog = OperationEvent(
self.nowTime if (not isinstance(event, FileClosedEvent)) else 0, event.event_type
)

# if this is a move event, we need to track the old and new filenames
if isinstance(event, FileSystemMovedEvent):
fName = event.dest_path
fNameOld = event.src_path
if self.logger:
self.logger.info(f"↦\t{event.event_type: >10}\t{event.src_path} {event.dest_path}")
else:
fName = event.src_path
fNameOld = None
if self.logger:
self.logger.info(f"🗲\t{event.event_type: <10}\t{event.src_path}")

with self.deck as d:
d.pop(event.src_path, self.nowTime)
d[event.dest_path] = self.nowTime
d.move_to_end(event.dest_path, last=True)
else:
# the file was moved from somewhere else, treat it as a create
self.on_created(event)

def on_closed(self, event):
# on_closed is only going to come from inotify events, not polling
# so we know we're good to go. set its time to expire immediately in the worker
if not event.is_directory:
with self.deck as d:
d[event.src_path] = 0
d.move_to_end(event.src_path, last=False)

def on_deleted(self, event):
# if a file is deleted I guess we don't need to track it any more
if not event.is_directory:
with self.deck as d:
d.pop(event.src_path, self.nowTime)
if fNameOld:
if same_file_or_dir(os.path.dirname(fNameOld), os.path.dirname(fName)):
# a file was simply renamed in the watched directory (not moved
# from some other directory) so just update the filename
d.pop(fNameOld, None)
else:
# the file was moved from somewhere else, treat it like a create
pass

# insert or update file event(s)

if fName in d:
# this is a file we're already currently tracking

# if the previous operation (the last one in the history) was the same as this one,
# replace the operation rather than appending a new one (effectively just updating the timestamp)
if (len(d[fName]) > 0) and (d[fName][-1].operation == event.event_type):
d[fName][-1] = newOpLog

else:
# otherwise append a new history item
d[fName].append(newOpLog)

else:
# this is a file we were not previously tracking
d[fName] = [newOpLog]

if (
isinstance(event, FileModifiedEvent)
or isinstance(event, FileClosedEvent)
or isinstance(event, FileCreatedEvent)
or isinstance(event, FileSystemMovedEvent)
):
# put FileClosedEvent events (which now have a timestamp of 0) at the front of
# the deck (to be processed first), and others to the back
d.move_to_end(fName, last=d[fName][-1] > 0)

elif isinstance(event, FileDeletedEvent):
# if a file is deleted I guess we don't need to track it any more
d.pop(fName, None)
fName = None

if self.logger and fName:
self.logger.debug(f"⎗\t{fName}\t{json.dumps(d[fName])}")

except Exception as e:
if self.logger:
self.logger.error(f"⨳\t{fName}\t{e}")


###################################################################################################
Expand All @@ -112,37 +150,47 @@ def ProcessFileEventWorker(workerArgs):
)

with workerThreadCount as workerId:
if logger is not None:
logger.info(f"[{workerId}]:started")
if logger:
logger.info(f"۞\tstarted\t[{workerId}]")

while (not shutDown[0]) and observer.is_alive():
time.sleep(1)
time.sleep(0.5)
nowTime = int(time.time())

with handler.deck as d:
for fileName, eventTime in list(d.items()):
if nowTime < eventTime + assumeClosedSec:
# we can break because the list is ordered
break
else:
del d[fileName]
if fileProcessor is not None:
extraArgs = (
fileProcessorKwargs
if fileProcessorKwargs and isinstance(fileProcessorKwargs, dict)
else {}
)
fileProcessor(
fileName,
**extraArgs,
)
if logger is not None:
logger.info(
f"processed {fileName} at {(nowTime-eventTime) if (eventTime > 0) else 0} seconds"
)
for fileName, fileHistory in list(d.items()):
if logger:
logger.debug(f"⏿ checking {fileName}\t{json.dumps(fileHistory)}\t[{workerId}]")

if len(fileHistory) > 0:
if nowTime < fileHistory[-1].timestamp + assumeClosedSec:
# we can break because the list is ordered
if logger:
logger.debug(
f"⎊\tbreaking early because {nowTime} < {fileHistory[-1].timestamp + assumeClosedSec}\t[{workerId}]"
)
break

else:
del d[fileName]
if fileProcessor is not None:
extraArgs = (
fileProcessorKwargs
if fileProcessorKwargs and isinstance(fileProcessorKwargs, dict)
else {}
)
fileProcessor(
fileName,
**extraArgs,
)
if logger:
logger.info(
f"🖄\tprocessed\t{fileName} at {(nowTime-fileHistory[-1].timestamp) if (fileHistory[-1].timestamp > 0) else 0} seconds\t[{workerId}]"
)

time.sleep(1)
if logger is not None:
logger.info(f"[{workerId}]: finished")
if logger:
logger.info(f"\tfinished\t[{workerId}]")


def WatchAndProcessDirectory(
Expand All @@ -161,7 +209,7 @@ def WatchAndProcessDirectory(
)
for directory in directories:
if logger:
logger.info(f"Scheduling {directory}")
logger.info(f"🗐\tScheduling {directory}")
observer.schedule(handler, directory, recursive=True)

observer.start()
Expand Down

0 comments on commit c6d206d

Please sign in to comment.