Skip to content

Commit

Permalink
move more stuff to actor, async calls in pvlibupdatescontroller
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph Mattiello <[email protected]>
  • Loading branch information
JoeMatt committed Nov 15, 2024
1 parent 55005fe commit ecf3316
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 101 deletions.
219 changes: 120 additions & 99 deletions PVLibrary/Sources/PVLibrary/DirectoryWatcher/DirectoryWatcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ public extension NSNotification.Name {
}

let TIMER_DELAY_IN_SECONDS = 2.0

/// The directory watcher class
import Perception



/// A class that watches a directory for changes and handles file operations
///
/// The DirectoryWatcher monitors a specified directory for new files and changes,
/// handling archive extraction and file processing automatically.
#if !os(tvOS)
@Observable
#else
Expand All @@ -77,11 +81,6 @@ public final class DirectoryWatcher: ObservableObject {
private var dispatchSource: DispatchSourceFileSystemObject?
/// The serial queue for the extractor
private let serialQueue = DispatchQueue(label: "org.provenance-emu.provenance.serialExtractorQueue")
/// The file watchers for the files being watched
private var fileWatchers: [URL: DispatchSourceFileSystemObject] = [:]
private var lastKnownSizes: [URL: Int64] = [:]
private var fileTimers: [URL: Timer] = [:]
private var lastKnownModificationDates: [URL: Date] = [:]

/// The extractors for the supported archive types
private let extractors: [ArchiveType: ArchiveExtractor] = [
Expand Down Expand Up @@ -150,7 +149,9 @@ public final class DirectoryWatcher: ObservableObject {

dispatchSource = DispatchSource.makeFileSystemObjectSource(fileDescriptor: fileDescriptor, eventMask: .write, queue: serialQueue)
dispatchSource?.setEventHandler { [weak self] in
self?.handleFileSystemEvent()
Task {
await self?.handleFileSystemEvent()
}
}
dispatchSource?.setCancelHandler {
ILOG("Closing file descriptor for directory: \(self.watchedDirectory.path)")
Expand All @@ -166,7 +167,10 @@ public final class DirectoryWatcher: ObservableObject {
dispatchSource?.cancel()
dispatchSource = nil
if includingFileWatchers {
fileWatchers.keys.forEach { stopWatchingFile(at: $0) }
Task {
let paths = await watcherManager.getWatchedPaths()
paths.forEach { stopWatchingFile(at: $0) }
}
}
ILOG("Monitoring stopped for directory: \(watchedDirectory.path)")
}
Expand Down Expand Up @@ -286,23 +290,22 @@ public final class DirectoryWatcher: ObservableObject {
/// Stop watching a file
private func stopWatchingFile(at path: URL) {
ILOG("Stopping watch for file: \(path.lastPathComponent)")
fileWatchers[path]?.cancel()
fileWatchers[path] = nil
fileTimers[path]?.invalidate()
ILOG("Timer invalidated for file: \(path.lastPathComponent)")
fileTimers[path] = nil
lastKnownSizes[path] = nil
lastKnownModificationDates[path] = nil
ILOG("File watcher, timer, last known size, and last known modification date removed for: \(path.lastPathComponent)")
Task {
await watcherManager.removeWatcher(for: path)
ILOG("File watcher removed for: \(path.lastPathComponent)")
}
}

/// Cleanup nonexistent file watchers
private func cleanupNonexistentFileWatchers() {
ILOG("Starting cleanup of nonexistent file watchers")
for path in fileWatchers.keys {
if !FileManager.default.fileExists(atPath: path.path) {
stopWatchingFile(at: path)
ILOG("Removed watcher for nonexistent file: \(path.lastPathComponent)")
Task {
let paths = await watcherManager.getWatchedPaths()
for path in paths {
if !FileManager.default.fileExists(atPath: path.path) {
stopWatchingFile(at: path)
ILOG("Removed watcher for nonexistent file: \(path.lastPathComponent)")
}
}
}
ILOG("Finished cleanup of nonexistent file watchers")
Expand Down Expand Up @@ -357,15 +360,15 @@ fileprivate extension DirectoryWatcher {
}

/// Handle a file system event
private func handleFileSystemEvent() {
private func handleFileSystemEvent() async {
ILOG("Handling file system event for directory: \(watchedDirectory.path)")
do {
let contents = try FileManager.default.contentsOfDirectory(at: watchedDirectory,
includingPropertiesForKeys: nil,
options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants])
ILOG("Found \(contents.count) items in directory after file system event")
contents.filter(isValidFile).forEach { file in
if !isWatchingFile(at: file) {
await contents.filter(isValidFile).asyncForEach { file in
if await !isWatchingFile(at: file) {
ILOG("Starting to watch new file: \(file.lastPathComponent)")
watchFile(at: file)
}
Expand Down Expand Up @@ -398,6 +401,15 @@ fileprivate extension DirectoryWatcher {
private func watchFile(at path: URL) {
Task {
ILOG("Starting to watch file: \(path.lastPathComponent)")

// Get initial file attributes
guard let attributes = try? FileManager.default.attributesOfItem(atPath: path.path),
let initialSize = attributes[.size] as? Int64,
let initialModDate = attributes[.modificationDate] as? Date else {
ELOG("Failed to get initial file attributes for: \(path.lastPathComponent)")
return
}

let fileDescriptor = open(path.path, O_EVTONLY)
guard fileDescriptor != -1 else {
ELOG("Error opening file for watching: \(path.path), error: \(String(cString: strerror(errno)))")
Expand All @@ -406,7 +418,7 @@ fileprivate extension DirectoryWatcher {

let source = await watcherManager.createFileSystemSource(
fileDescriptor: fileDescriptor,
eventMask: [.write, .extend],
eventMask: [DispatchSource.FileSystemEvent.write, DispatchSource.FileSystemEvent.extend],
eventHandler: { [weak self] in
Task { @MainActor in
await self?.handleFileChange(at: path)
Expand All @@ -419,7 +431,10 @@ fileprivate extension DirectoryWatcher {
)

source.resume()
await watcherManager.addWatcher(source, for: path)
await watcherManager.addWatcher(source,
for: path,
initialSize: initialSize,
modificationDate: initialModDate)

// Start monitoring file changes
await monitorFileChanges(for: path)
Expand Down Expand Up @@ -462,61 +477,38 @@ fileprivate extension DirectoryWatcher {
let currentSize = attributes[.size] as? Int64 ?? 0
let currentModificationDate = attributes[.modificationDate] as? Date ?? Date()

await watcherManager.updateFileStatus(for: path, size: currentSize, modificationDate: currentModificationDate)
await watcherManager.updateFileStatus(
for: path,
size: currentSize,
modificationDate: currentModificationDate
)

// Schedule status check
await checkFileStatus(at: path)

} catch {
ELOG("Error handling file change: \(error.localizedDescription)")
await watcherManager.removeWatcher(for: path)
}
}

private func checkFileStatus(at path: URL) async {
ILOG("Checking file status for: \(path)")
do {
let attributes = try FileManager.default.attributesOfItem(atPath: path.path)
let currentSize = attributes[.size] as? Int64 ?? 0
let currentModificationDate = attributes[.modificationDate] as? Date

ILOG("Current size of file \(path.lastPathComponent): \(currentSize)")
ILOG("Current modification date of file \(path.lastPathComponent): \(String(describing: currentModificationDate))")

if let lastSize = lastKnownSizes[path] {
ILOG("Last known size of file \(path.lastPathComponent): \(lastSize)")
} else {
ILOG("No last known size for file \(path.lastPathComponent)")
}
guard let status = await watcherManager.getFileStatus(for: path) else {
ELOG("No status found for file: \(path.lastPathComponent)")
return
}

if let lastModDate = lastKnownModificationDates[path] {
ILOG("Last known modification date of file \(path.lastPathComponent): \(lastModDate)")
} else {
ILOG("No last known modification date for file \(path.lastPathComponent)")
}
let attributes = try? FileManager.default.attributesOfItem(atPath: path.path)
let currentSize = attributes?[.size] as? Int64 ?? 0

// If the file size and modification date haven't changed in 2 seconds, consider it done
if let lastSize = lastKnownSizes[path],
let lastModDate = lastKnownModificationDates[path],
lastSize == currentSize,
lastModDate == currentModificationDate {
ILOG("File upload completed: \(path.lastPathComponent)")
stopWatchingFile(at: path)
handleCompletedFile(at: path)
// If file size hasn't changed for a while, process it
if currentSize > 0 && currentSize == status.size {
if isArchive(path) {
ILOG("Archive file appears complete, starting extraction: \(path.lastPathComponent)")
try? await extractArchive(at: path)
} else {
if lastKnownSizes[path] != currentSize {
ILOG("File size changed for \(path.lastPathComponent)")
}
if lastKnownModificationDates[path] != currentModificationDate {
ILOG("File modification date changed for \(path.lastPathComponent)")
}
lastKnownSizes[path] = currentSize
lastKnownModificationDates[path] = currentModificationDate
VLOG("File still uploading: \(path.lastPathComponent), current size: \(currentSize)")
ILOG("Non-archive file appears complete: \(path.lastPathComponent)")
completedFilesContinuation?.yield([path])
}
} catch {
ELOG("Error checking file status: \(error)")
stopWatchingFile(at: path)
await watcherManager.removeWatcher(for: path)
}
}

Expand All @@ -534,14 +526,14 @@ fileprivate extension DirectoryWatcher {
}
}

public func isWatchingFile(at path: URL) -> Bool {
let isWatching = fileWatchers[path] != nil && !fileWatchers[path]!.isCancelled
public func isWatchingFile(at path: URL) async -> Bool {
let isWatching = await watcherManager.isWatching(path)
ILOG("Checked if watching file: \(path.lastPathComponent), result: \(isWatching)")
return isWatching
}

public func isWatchingAnyFile() -> Bool {
return !fileWatchers.isEmpty
public func isWatchingAnyFile() async -> Bool {
return await !watcherManager.hasActiveWatchers()
}
}

Expand Down Expand Up @@ -572,7 +564,7 @@ extension DirectoryWatcher {
Task {
// Delay starting the extraction to allow the file system to settle
ILOG("Scheduling delayed extraction for file: \(destinationURL.lastPathComponent)")
try await Task.sleep(nanoseconds: 1_000_000_000) // 1 second delay
try await Task.sleep(nanoseconds: 1_500_000_000) // 1.5 second delay
try await self.extractArchive(at: destinationURL)
}
} catch {
Expand Down Expand Up @@ -659,55 +651,84 @@ func delay(_ duration: TimeInterval, operation: @escaping () async throws -> Voi
}

private actor FileWatcherManager {
private var fileWatchers: [URL: DispatchSourceFileSystemObject] = [:]
private var lastKnownSizes: [URL: Int64] = [:]
private var fileTimers: [URL: Timer] = [:]
private var lastKnownModificationDates: [URL: Date] = [:]
private struct FileStatus {
var watcher: DispatchSourceFileSystemObject
var size: Int64
var modificationDate: Date
var timer: Timer?

mutating func update(size: Int64? = nil,
modificationDate: Date? = nil,
timer: Timer? = nil) {
if let size = size {
self.size = size
}
if let modificationDate = modificationDate {
self.modificationDate = modificationDate
}
if let timer = timer {
self.timer?.invalidate()
self.timer = timer
}
}
}

private var fileStatuses: [URL: FileStatus] = [:]
private let serialQueue: DispatchQueue

init(label: String) {
self.serialQueue = DispatchQueue(label: label)
}

func addWatcher(_ source: DispatchSourceFileSystemObject, for path: URL) {
fileWatchers[path] = source
func addWatcher(_ source: DispatchSourceFileSystemObject,
for path: URL,
initialSize: Int64,
modificationDate: Date) {
let status = FileStatus(
watcher: source,
size: initialSize,
modificationDate: modificationDate,
timer: nil
)
fileStatuses[path] = status
}

func removeWatcher(for path: URL) {
fileWatchers[path]?.cancel()
fileWatchers[path] = nil
fileTimers[path]?.invalidate()
fileTimers[path] = nil
lastKnownSizes[path] = nil
lastKnownModificationDates[path] = nil
if let status = fileStatuses[path] {
status.watcher.cancel()
status.timer?.invalidate()
}
fileStatuses[path] = nil
}

func updateFileStatus(for path: URL, size: Int64, modificationDate: Date) {
lastKnownSizes[path] = size
lastKnownModificationDates[path] = modificationDate
func updateFileStatus(for path: URL, size: Int64? = nil, modificationDate: Date? = nil, timer: Timer? = nil) {
fileStatuses[path]?.update(size: size, modificationDate: modificationDate, timer: timer)
}

func getFileStatus(for path: URL) -> (size: Int64?, modificationDate: Date?) {
return (lastKnownSizes[path], lastKnownModificationDates[path])
func getFileStatus(for path: URL) -> (size: Int64, modificationDate: Date)? {
guard let status = fileStatuses[path] else { return nil }
return (status.size, status.modificationDate)
}

func isWatching(_ path: URL) -> Bool {
return fileWatchers[path] != nil && !fileWatchers[path]!.isCancelled
guard let status = fileStatuses[path] else { return false }
return !status.watcher.isCancelled
}

func hasActiveWatchers() -> Bool {
return !fileWatchers.isEmpty
return !fileStatuses.isEmpty
}

func updateTimer(for path: URL, timer: Timer?) {
fileTimers[path]?.invalidate()
fileTimers[path] = timer
func getWatchedPaths() -> [URL] {
Array(fileStatuses.keys)
}

func createFileSystemSource(fileDescriptor: Int32,
eventMask: DispatchSource.FileSystemEvent,
eventHandler: @escaping () -> Void,
cancelHandler: @escaping () -> Void) -> DispatchSourceFileSystemObject {
func createFileSystemSource(
fileDescriptor: Int32,
eventMask: DispatchSource.FileSystemEvent,
eventHandler: @escaping () -> Void,
cancelHandler: @escaping () -> Void
) -> DispatchSourceFileSystemObject {
let source = DispatchSource.makeFileSystemObjectSource(
fileDescriptor: fileDescriptor,
eventMask: eventMask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,15 @@ public final class PVGameLibraryUpdatesController: ObservableObject {
for await extractedFiles in directoryWatcher.extractedFilesStream(at: importPath) {
var readyURLs:[URL] = []
for url in extractedFiles {
if (!directoryWatcher.isWatchingFile(at: url)) {
if await (!directoryWatcher.isWatchingFile(at: url)) {
readyURLs.append(url)
}
}
if (!readyURLs.isEmpty) {
gameImporter.addImports(forPaths: readyURLs)
}

if (!directoryWatcher.isWatchingAnyFile()) {
if await (!directoryWatcher.isWatchingAnyFile()) {
ILOG("I think all the imports are settled, might be ok to start the queue")
gameImporter.startProcessing()
}
Expand Down

0 comments on commit ecf3316

Please sign in to comment.