Skip to content

Commit

Permalink
some improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
namchuai committed Nov 19, 2024
1 parent a5b704c commit 339459e
Showing 1 changed file with 116 additions and 82 deletions.
198 changes: 116 additions & 82 deletions engine/services/file_watcher_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,49 @@
#endif

class FileWatcherService {
private:
#if defined(_WIN32)
HANDLE dirHandle;
#elif defined(__APPLE__)
FSEventStreamRef event_stream;
#else // Linux
std::unordered_map<int, std::string> watchDescriptors;
#endif

public:
FileWatcherService(const std::string& path,
std::shared_ptr<ModelService> model_service)
: watchPath{path}, running{false} {
if (!std::filesystem::exists(path)) {
throw std::runtime_error("Path does not exist: " + path);
}
CTL_INF("FileWatcherService created: " + path);
}

~FileWatcherService() { stop(); }

void start() {
if (running)
if (running) {
return;
}

running = true;
watchThread = std::thread(&FileWatcherService::watcherThread, this);
}

void stop() {
CTL_INF("FileWatcherService stop");
#ifdef _WIN32
CloseHandle(dirHandle);
#endif

#ifdef Linux
cleanupWatches();
#endif
running = false;
if (watchThread.joinable()) {
watchThread.join();
}
CTL_INF("FileWatcherService stopped!");
}

private:
Expand All @@ -51,140 +72,153 @@ class FileWatcherService {
std::shared_ptr<ModelService> model_service_;

#ifdef __APPLE__

static void callback(ConstFSEventStreamRef streamRef,
void* clientCallBackInfo, size_t numEvents,
void* eventPaths,
const FSEventStreamEventFlags eventFlags[],
const FSEventStreamEventId eventIds[]) {
char** paths = (char**)eventPaths;
// model_service->ForceIndexingModelList();
FileWatcherService* watcher =
static_cast<FileWatcherService*>(clientCallBackInfo);
watcher->model_service_->ForceIndexingModelList();
auto** paths = (char**)eventPaths;
auto* watcher = static_cast<FileWatcherService*>(clientCallBackInfo);

for (size_t i = 0; i < numEvents; i++) {
if (eventFlags[i] & kFSEventStreamEventFlagItemRemoved) {
std::cout << "File deleted: " << paths[i] << std::endl;
std::cout << "File deleted: " << paths[i]
<< std::endl; // todo: remove after debug finished
watcher->model_service_->ForceIndexingModelList();
}
}
}

void watcherThread() {
FSEventStreamContext context = {0, this, nullptr, nullptr, nullptr};
CFStringRef pathRef = CFStringCreateWithCString(nullptr, watchPath.c_str(),
kCFStringEncodingUTF8);
CFArrayRef pathsToWatch =
CFArrayCreate(nullptr, (const void**)&pathRef, 1, nullptr);

FSEventStreamRef stream =
FSEventStreamCreate(nullptr, &callback, &context, pathsToWatch,
kFSEventStreamEventIdSinceNow,
// macOS implementation
auto mypath = CFStringCreateWithCString(NULL, watchPath.c_str(),
kCFStringEncodingUTF8);
auto path_to_watch = CFArrayCreate(NULL, (const void**)&mypath, 1, NULL);

FSEventStreamContext context = {0, this, NULL, NULL, NULL};

event_stream =
FSEventStreamCreate(NULL, &FileWatcherService::callback, &context,
path_to_watch, kFSEventStreamEventIdSinceNow,
0.5, // 500ms latency
kFSEventStreamCreateFlagFileEvents);

FSEventStreamScheduleWithRunLoop(stream, CFRunLoopGetCurrent(),
kCFRunLoopDefaultMode);
FSEventStreamStart(stream);
dispatch_queue_t queue = dispatch_get_main_queue();
FSEventStreamSetDispatchQueue(event_stream, queue);
FSEventStreamStart(event_stream);

CTL_INF("NamH start loop");
while (running) {
CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, false);
}

FSEventStreamStop(stream);
FSEventStreamUnscheduleFromRunLoop(stream, CFRunLoopGetCurrent(),
kCFRunLoopDefaultMode);
FSEventStreamInvalidate(stream);
FSEventStreamRelease(stream);
CFRelease(pathsToWatch);
CFRelease(pathRef);
FSEventStreamStop(event_stream);
FSEventStreamInvalidate(event_stream);
FSEventStreamRelease(event_stream);
CFRelease(path_to_watch);
CFRelease(mypath);
}

#elif defined(_WIN32)
void watcherThread() {
HANDLE hDir =
CreateFileA(watchPath.c_str(), FILE_LIST_DIRECTORY,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
nullptr, OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, nullptr);

if (hDir == INVALID_HANDLE_VALUE) {
std::cerr << "Failed to open directory" << std::endl;
return;
dirHandle = CreateFileA(
path.c_str(), FILE_LIST_DIRECTORY,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL,
OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL);

if (dirHandle == INVALID_HANDLE_VALUE) {
throw std::runtime_error("Failed to open directory");
}

char buffer[4096];
DWORD bytesReturned;
OVERLAPPED overlapped = {0};
overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr);

while (running) {
ReadDirectoryChangesW(
hDir, buffer, sizeof(buffer), TRUE,
FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME,
&bytesReturned, &overlapped, nullptr);

WaitForSingleObject(overlapped.hEvent, 1000);

FILE_NOTIFY_INFORMATION* event = (FILE_NOTIFY_INFORMATION*)buffer;
do {
if (event->Action == FILE_ACTION_REMOVED) {
wchar_t fileName[MAX_PATH];
memcpy(fileName, event->FileName, event->FileNameLength);
fileName[event->FileNameLength / 2] = L'\0';
std::wcout << L"File deleted: " << fileName << std::endl;
}
if (ReadDirectoryChangesW(
dirHandle, buffer, sizeof(buffer),
TRUE, // Watch subtree
FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME,
&bytesReturned, &overlapped, NULL)) {
FILE_NOTIFY_INFORMATION* event = (FILE_NOTIFY_INFORMATION*)buffer;
do {
if (event->Action == FILE_ACTION_REMOVED) {
wchar_t fileName[MAX_PATH];
wcsncpy_s(fileName, event->FileName,
event->FileNameLength / sizeof(wchar_t));
fileName[event->FileNameLength / sizeof(wchar_t)] = '\0';
std::wcout << L"Deleted: " << fileName << std::endl;
model_service_->ForceIndexingModelList();
}

if (event->NextEntryOffset == 0) {
break;
}
event = (FILE_NOTIFY_INFORMATION*)((uint8_t*)event +
event->NextEntryOffset);
} while (true);
}
}
}

if (event->NextEntryOffset == 0)
break;
event = (FILE_NOTIFY_INFORMATION*)((uint8_t*)event +
event->NextEntryOffset);
} while (true);
#else // Linux

ResetEvent(overlapped.hEvent);
void addWatch(const std::string& dirPath) {
wd = inotify_add_watch(fd, dirPath.c_str(),
IN_DELETE | IN_CREATE | IN_DELETE_SELF);
if (wd < 0) {
throw std::runtime_error("Failed to add watch on: " + dirPath);
}
watchDescriptors[wd] = dirPath;

CloseHandle(overlapped.hEvent);
CloseHandle(hDir);
// Recursively add watches to subdirectories
for (const auto& entry :
std::filesystem::recursive_directory_iterator(dirPath)) {
if (std::filesystem::is_directory(entry)) {
addWatch(entry.path().string());
}
}
}

#else // Linux
void watcherThread() {
int fd = inotify_init();
if (fd < 0) {
std::cerr << "Failed to initialize inotify" << std::endl;
return;
void cleanupWatches() {
for (const auto& [wd, path] : watchDescriptors) {
inotify_rm_watch(fd, wd);
}
watchDescriptors.clear();

int wd = inotify_add_watch(fd, watchPath.c_str(), IN_DELETE);
if (wd < 0) {
std::cerr << "Failed to add watch" << std::endl;
if (fd >= 0) {
close(fd);
return;
fd = -1;
}
}

const size_t event_size = sizeof(struct inotify_event);
const size_t buf_len = 1024 * (event_size + 16);
char buffer[buf_len];
void watcherThread() {
fd = inotify_init();
if (fd < 0) {
throw std::runtime_error("Failed to initialize inotify");
}

// Add initial watch on the main directory
addWatch(path);

char buffer[4096];
while (running) {
int length = read(fd, buffer, buf_len);
int length = read(fd, buffer, sizeof(buffer));
if (length < 0) {
if (errno == EINTR)
continue;
break;
continue;
}

int i = 0;
while (i < length) {
struct inotify_event* event = (struct inotify_event*)&buffer[i];
if (event->len && (event->mask & IN_DELETE)) {
std::cout << "File deleted: " << event->name << std::endl;
if (event->mask & IN_DELETE) {
auto deletedPath = watchDescriptors[event->wd] + "/" + event->name;
std::cout << "Deleted: " << deletedPath << std::endl;
}
i += event_size + event->len;
i += sizeof(struct inotify_event) + event->len;
}
}

inotify_rm_watch(fd, wd);
close(fd);
}
#endif
Expand Down

0 comments on commit 339459e

Please sign in to comment.