From ecf3316fb7ed82efb562b805cec6333b7503523b Mon Sep 17 00:00:00 2001 From: Joseph Mattiello Date: Thu, 14 Nov 2024 19:22:25 -0500 Subject: [PATCH] move more stuff to actor, async calls in pvlibupdatescontroller Signed-off-by: Joseph Mattiello --- .../DirectoryWatcher/DirectoryWatcher.swift | 219 ++++++++++-------- .../PVGameLibraryUpdatesController.swift | 4 +- 2 files changed, 122 insertions(+), 101 deletions(-) diff --git a/PVLibrary/Sources/PVLibrary/DirectoryWatcher/DirectoryWatcher.swift b/PVLibrary/Sources/PVLibrary/DirectoryWatcher/DirectoryWatcher.swift index 60c2bc2c0a..56893980b5 100644 --- a/PVLibrary/Sources/PVLibrary/DirectoryWatcher/DirectoryWatcher.swift +++ b/PVLibrary/Sources/PVLibrary/DirectoryWatcher/DirectoryWatcher.swift @@ -59,10 +59,14 @@ public extension NSNotification.Name { } let TIMER_DELAY_IN_SECONDS = 2.0 - -/// The directory watcher class import Perception + + +/// A class that watches a directory for changes and handles file operations +/// +/// The DirectoryWatcher monitors a specified directory for new files and changes, +/// handling archive extraction and file processing automatically. #if !os(tvOS) @Observable #else @@ -77,11 +81,6 @@ public final class DirectoryWatcher: ObservableObject { private var dispatchSource: DispatchSourceFileSystemObject? /// The serial queue for the extractor private let serialQueue = DispatchQueue(label: "org.provenance-emu.provenance.serialExtractorQueue") - /// The file watchers for the files being watched - private var fileWatchers: [URL: DispatchSourceFileSystemObject] = [:] - private var lastKnownSizes: [URL: Int64] = [:] - private var fileTimers: [URL: Timer] = [:] - private var lastKnownModificationDates: [URL: Date] = [:] /// The extractors for the supported archive types private let extractors: [ArchiveType: ArchiveExtractor] = [ @@ -150,7 +149,9 @@ public final class DirectoryWatcher: ObservableObject { dispatchSource = DispatchSource.makeFileSystemObjectSource(fileDescriptor: fileDescriptor, eventMask: .write, queue: serialQueue) dispatchSource?.setEventHandler { [weak self] in - self?.handleFileSystemEvent() + Task { + await self?.handleFileSystemEvent() + } } dispatchSource?.setCancelHandler { ILOG("Closing file descriptor for directory: \(self.watchedDirectory.path)") @@ -166,7 +167,10 @@ public final class DirectoryWatcher: ObservableObject { dispatchSource?.cancel() dispatchSource = nil if includingFileWatchers { - fileWatchers.keys.forEach { stopWatchingFile(at: $0) } + Task { + let paths = await watcherManager.getWatchedPaths() + paths.forEach { stopWatchingFile(at: $0) } + } } ILOG("Monitoring stopped for directory: \(watchedDirectory.path)") } @@ -286,23 +290,22 @@ public final class DirectoryWatcher: ObservableObject { /// Stop watching a file private func stopWatchingFile(at path: URL) { ILOG("Stopping watch for file: \(path.lastPathComponent)") - fileWatchers[path]?.cancel() - fileWatchers[path] = nil - fileTimers[path]?.invalidate() - ILOG("Timer invalidated for file: \(path.lastPathComponent)") - fileTimers[path] = nil - lastKnownSizes[path] = nil - lastKnownModificationDates[path] = nil - ILOG("File watcher, timer, last known size, and last known modification date removed for: \(path.lastPathComponent)") + Task { + await watcherManager.removeWatcher(for: path) + ILOG("File watcher removed for: \(path.lastPathComponent)") + } } /// Cleanup nonexistent file watchers private func cleanupNonexistentFileWatchers() { ILOG("Starting cleanup of nonexistent file watchers") - for path in fileWatchers.keys { - if !FileManager.default.fileExists(atPath: path.path) { - stopWatchingFile(at: path) - ILOG("Removed watcher for nonexistent file: \(path.lastPathComponent)") + Task { + let paths = await watcherManager.getWatchedPaths() + for path in paths { + if !FileManager.default.fileExists(atPath: path.path) { + stopWatchingFile(at: path) + ILOG("Removed watcher for nonexistent file: \(path.lastPathComponent)") + } } } ILOG("Finished cleanup of nonexistent file watchers") @@ -357,15 +360,15 @@ fileprivate extension DirectoryWatcher { } /// Handle a file system event - private func handleFileSystemEvent() { + private func handleFileSystemEvent() async { ILOG("Handling file system event for directory: \(watchedDirectory.path)") do { let contents = try FileManager.default.contentsOfDirectory(at: watchedDirectory, includingPropertiesForKeys: nil, options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants]) ILOG("Found \(contents.count) items in directory after file system event") - contents.filter(isValidFile).forEach { file in - if !isWatchingFile(at: file) { + await contents.filter(isValidFile).asyncForEach { file in + if await !isWatchingFile(at: file) { ILOG("Starting to watch new file: \(file.lastPathComponent)") watchFile(at: file) } @@ -398,6 +401,15 @@ fileprivate extension DirectoryWatcher { private func watchFile(at path: URL) { Task { ILOG("Starting to watch file: \(path.lastPathComponent)") + + // Get initial file attributes + guard let attributes = try? FileManager.default.attributesOfItem(atPath: path.path), + let initialSize = attributes[.size] as? Int64, + let initialModDate = attributes[.modificationDate] as? Date else { + ELOG("Failed to get initial file attributes for: \(path.lastPathComponent)") + return + } + let fileDescriptor = open(path.path, O_EVTONLY) guard fileDescriptor != -1 else { ELOG("Error opening file for watching: \(path.path), error: \(String(cString: strerror(errno)))") @@ -406,7 +418,7 @@ fileprivate extension DirectoryWatcher { let source = await watcherManager.createFileSystemSource( fileDescriptor: fileDescriptor, - eventMask: [.write, .extend], + eventMask: [DispatchSource.FileSystemEvent.write, DispatchSource.FileSystemEvent.extend], eventHandler: { [weak self] in Task { @MainActor in await self?.handleFileChange(at: path) @@ -419,7 +431,10 @@ fileprivate extension DirectoryWatcher { ) source.resume() - await watcherManager.addWatcher(source, for: path) + await watcherManager.addWatcher(source, + for: path, + initialSize: initialSize, + modificationDate: initialModDate) // Start monitoring file changes await monitorFileChanges(for: path) @@ -462,11 +477,13 @@ fileprivate extension DirectoryWatcher { let currentSize = attributes[.size] as? Int64 ?? 0 let currentModificationDate = attributes[.modificationDate] as? Date ?? Date() - await watcherManager.updateFileStatus(for: path, size: currentSize, modificationDate: currentModificationDate) + await watcherManager.updateFileStatus( + for: path, + size: currentSize, + modificationDate: currentModificationDate + ) - // Schedule status check await checkFileStatus(at: path) - } catch { ELOG("Error handling file change: \(error.localizedDescription)") await watcherManager.removeWatcher(for: path) @@ -474,49 +491,24 @@ fileprivate extension DirectoryWatcher { } private func checkFileStatus(at path: URL) async { - ILOG("Checking file status for: \(path)") - do { - let attributes = try FileManager.default.attributesOfItem(atPath: path.path) - let currentSize = attributes[.size] as? Int64 ?? 0 - let currentModificationDate = attributes[.modificationDate] as? Date - - ILOG("Current size of file \(path.lastPathComponent): \(currentSize)") - ILOG("Current modification date of file \(path.lastPathComponent): \(String(describing: currentModificationDate))") - - if let lastSize = lastKnownSizes[path] { - ILOG("Last known size of file \(path.lastPathComponent): \(lastSize)") - } else { - ILOG("No last known size for file \(path.lastPathComponent)") - } + guard let status = await watcherManager.getFileStatus(for: path) else { + ELOG("No status found for file: \(path.lastPathComponent)") + return + } - if let lastModDate = lastKnownModificationDates[path] { - ILOG("Last known modification date of file \(path.lastPathComponent): \(lastModDate)") - } else { - ILOG("No last known modification date for file \(path.lastPathComponent)") - } + let attributes = try? FileManager.default.attributesOfItem(atPath: path.path) + let currentSize = attributes?[.size] as? Int64 ?? 0 - // If the file size and modification date haven't changed in 2 seconds, consider it done - if let lastSize = lastKnownSizes[path], - let lastModDate = lastKnownModificationDates[path], - lastSize == currentSize, - lastModDate == currentModificationDate { - ILOG("File upload completed: \(path.lastPathComponent)") - stopWatchingFile(at: path) - handleCompletedFile(at: path) + // If file size hasn't changed for a while, process it + if currentSize > 0 && currentSize == status.size { + if isArchive(path) { + ILOG("Archive file appears complete, starting extraction: \(path.lastPathComponent)") + try? await extractArchive(at: path) } else { - if lastKnownSizes[path] != currentSize { - ILOG("File size changed for \(path.lastPathComponent)") - } - if lastKnownModificationDates[path] != currentModificationDate { - ILOG("File modification date changed for \(path.lastPathComponent)") - } - lastKnownSizes[path] = currentSize - lastKnownModificationDates[path] = currentModificationDate - VLOG("File still uploading: \(path.lastPathComponent), current size: \(currentSize)") + ILOG("Non-archive file appears complete: \(path.lastPathComponent)") + completedFilesContinuation?.yield([path]) } - } catch { - ELOG("Error checking file status: \(error)") - stopWatchingFile(at: path) + await watcherManager.removeWatcher(for: path) } } @@ -534,14 +526,14 @@ fileprivate extension DirectoryWatcher { } } - public func isWatchingFile(at path: URL) -> Bool { - let isWatching = fileWatchers[path] != nil && !fileWatchers[path]!.isCancelled + public func isWatchingFile(at path: URL) async -> Bool { + let isWatching = await watcherManager.isWatching(path) ILOG("Checked if watching file: \(path.lastPathComponent), result: \(isWatching)") return isWatching } - public func isWatchingAnyFile() -> Bool { - return !fileWatchers.isEmpty + public func isWatchingAnyFile() async -> Bool { + return await !watcherManager.hasActiveWatchers() } } @@ -572,7 +564,7 @@ extension DirectoryWatcher { Task { // Delay starting the extraction to allow the file system to settle ILOG("Scheduling delayed extraction for file: \(destinationURL.lastPathComponent)") - try await Task.sleep(nanoseconds: 1_000_000_000) // 1 second delay + try await Task.sleep(nanoseconds: 1_500_000_000) // 1.5 second delay try await self.extractArchive(at: destinationURL) } } catch { @@ -659,55 +651,84 @@ func delay(_ duration: TimeInterval, operation: @escaping () async throws -> Voi } private actor FileWatcherManager { - private var fileWatchers: [URL: DispatchSourceFileSystemObject] = [:] - private var lastKnownSizes: [URL: Int64] = [:] - private var fileTimers: [URL: Timer] = [:] - private var lastKnownModificationDates: [URL: Date] = [:] + private struct FileStatus { + var watcher: DispatchSourceFileSystemObject + var size: Int64 + var modificationDate: Date + var timer: Timer? + + mutating func update(size: Int64? = nil, + modificationDate: Date? = nil, + timer: Timer? = nil) { + if let size = size { + self.size = size + } + if let modificationDate = modificationDate { + self.modificationDate = modificationDate + } + if let timer = timer { + self.timer?.invalidate() + self.timer = timer + } + } + } + + private var fileStatuses: [URL: FileStatus] = [:] private let serialQueue: DispatchQueue init(label: String) { self.serialQueue = DispatchQueue(label: label) } - func addWatcher(_ source: DispatchSourceFileSystemObject, for path: URL) { - fileWatchers[path] = source + func addWatcher(_ source: DispatchSourceFileSystemObject, + for path: URL, + initialSize: Int64, + modificationDate: Date) { + let status = FileStatus( + watcher: source, + size: initialSize, + modificationDate: modificationDate, + timer: nil + ) + fileStatuses[path] = status } func removeWatcher(for path: URL) { - fileWatchers[path]?.cancel() - fileWatchers[path] = nil - fileTimers[path]?.invalidate() - fileTimers[path] = nil - lastKnownSizes[path] = nil - lastKnownModificationDates[path] = nil + if let status = fileStatuses[path] { + status.watcher.cancel() + status.timer?.invalidate() + } + fileStatuses[path] = nil } - func updateFileStatus(for path: URL, size: Int64, modificationDate: Date) { - lastKnownSizes[path] = size - lastKnownModificationDates[path] = modificationDate + func updateFileStatus(for path: URL, size: Int64? = nil, modificationDate: Date? = nil, timer: Timer? = nil) { + fileStatuses[path]?.update(size: size, modificationDate: modificationDate, timer: timer) } - func getFileStatus(for path: URL) -> (size: Int64?, modificationDate: Date?) { - return (lastKnownSizes[path], lastKnownModificationDates[path]) + func getFileStatus(for path: URL) -> (size: Int64, modificationDate: Date)? { + guard let status = fileStatuses[path] else { return nil } + return (status.size, status.modificationDate) } func isWatching(_ path: URL) -> Bool { - return fileWatchers[path] != nil && !fileWatchers[path]!.isCancelled + guard let status = fileStatuses[path] else { return false } + return !status.watcher.isCancelled } func hasActiveWatchers() -> Bool { - return !fileWatchers.isEmpty + return !fileStatuses.isEmpty } - func updateTimer(for path: URL, timer: Timer?) { - fileTimers[path]?.invalidate() - fileTimers[path] = timer + func getWatchedPaths() -> [URL] { + Array(fileStatuses.keys) } - func createFileSystemSource(fileDescriptor: Int32, - eventMask: DispatchSource.FileSystemEvent, - eventHandler: @escaping () -> Void, - cancelHandler: @escaping () -> Void) -> DispatchSourceFileSystemObject { + func createFileSystemSource( + fileDescriptor: Int32, + eventMask: DispatchSource.FileSystemEvent, + eventHandler: @escaping () -> Void, + cancelHandler: @escaping () -> Void + ) -> DispatchSourceFileSystemObject { let source = DispatchSource.makeFileSystemObjectSource( fileDescriptor: fileDescriptor, eventMask: eventMask, diff --git a/PVUI/Sources/PVUIBase/Game Library/PVGameLibraryUpdatesController.swift b/PVUI/Sources/PVUIBase/Game Library/PVGameLibraryUpdatesController.swift index 043dfd23f3..3566614891 100644 --- a/PVUI/Sources/PVUIBase/Game Library/PVGameLibraryUpdatesController.swift +++ b/PVUI/Sources/PVUIBase/Game Library/PVGameLibraryUpdatesController.swift @@ -259,7 +259,7 @@ public final class PVGameLibraryUpdatesController: ObservableObject { for await extractedFiles in directoryWatcher.extractedFilesStream(at: importPath) { var readyURLs:[URL] = [] for url in extractedFiles { - if (!directoryWatcher.isWatchingFile(at: url)) { + if await (!directoryWatcher.isWatchingFile(at: url)) { readyURLs.append(url) } } @@ -267,7 +267,7 @@ public final class PVGameLibraryUpdatesController: ObservableObject { gameImporter.addImports(forPaths: readyURLs) } - if (!directoryWatcher.isWatchingAnyFile()) { + if await (!directoryWatcher.isWatchingAnyFile()) { ILOG("I think all the imports are settled, might be ok to start the queue") gameImporter.startProcessing() }