Skip to content

Commit

Permalink
make file_list local to the scope of the poll loop to avoid holding t…
Browse files Browse the repository at this point in the history
…oo much memory after sync
  • Loading branch information
joshuaboud committed Apr 28, 2021
1 parent 82c80f8 commit 8319a8e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 31 deletions.
35 changes: 14 additions & 21 deletions src/impl/crawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,12 @@ extern "C"{

Crawler::Crawler(const fs::path &config_path, size_t envp_size, const ConfigOverrides &config_overrides)
: config_(config_path, config_overrides)
, file_list_()
, last_rctime_(config_.last_rctime_path_)
, syncer(envp_size, config_){
base_path_ = config_.base_path_;
set_signal_handlers(this);
}

void Crawler::reset(void){
file_list_.clear();
}

void Crawler::poll_base(bool seed, bool dry_run, bool set_rctime, bool oneshot){
timespec new_rctime = {0};
timespec old_rctime_cache = {0};
Expand All @@ -53,30 +48,29 @@ void Crawler::poll_base(bool seed, bool dry_run, bool set_rctime, bool oneshot){
Logging::log.message("Checking for change.", 2);
if(last_rctime_.check_for_change(base_path_, new_rctime)){
Logging::log.message("Change detected in " + base_path_.string(), 1);
std::vector<File> file_list;
uintmax_t total_bytes = 0;
// take snapshot
create_snap(new_rctime);
// wait for rctime to trickle to root
std::this_thread::sleep_for(config_.prop_delay_ms_);
// queue files
trigger_search(snap_path_, total_bytes);
trigger_search(file_list, snap_path_, total_bytes);
if(!set_rctime){
std::string msg = "New files to sync: " + std::to_string(file_list_.size());
std::string msg = "New files to sync: " + std::to_string(file_list.size());
msg += " (" + Logging::log.format_bytes(total_bytes) + ")";
Logging::log.message(msg, 1);
}
// launch rsync
if(!file_list_.empty()){
if(!file_list.empty()){
if(dry_run){
std::string msg = config_.exec_bin_ + " " + config_.exec_flags_ + " <file list> ";
msg += syncer.construct_destination(config_.remote_user_, config_.remote_host_, config_.remote_directory_);
Logging::log.message(msg, 1);
}else if(!set_rctime){
syncer.launch_procs(file_list_, total_bytes);
syncer.launch_procs(file_list, total_bytes);
}
}
// clear sync queue
reset();
// delete snapshot
delete_snap();
// overwrite last_rctime
Expand Down Expand Up @@ -111,12 +105,12 @@ void Crawler::create_snap(const timespec &rctime){
}
}

void Crawler::trigger_search(const fs::path &snap_path, uintmax_t &total_bytes){
void Crawler::trigger_search(std::vector<File> &file_list, const fs::path &snap_path, uintmax_t &total_bytes){
// launch crawler in snapshot
Logging::log.message("Launching crawler",2);
if(config_.threads_ == 1){ // DFS
// seed recursive function with snap_path
find_new_files_recursive(snap_path, snap_path, total_bytes);
find_new_files_recursive(file_list, snap_path, snap_path, total_bytes);
}else if(config_.threads_ > 1){ // multithreaded BFS
std::atomic<uintmax_t> total_bytes_at(0);
std::atomic<int> threads_running(0);
Expand All @@ -126,19 +120,18 @@ void Crawler::trigger_search(const fs::path &snap_path, uintmax_t &total_bytes){
queue.push(snap_path);
// create threads
for(int i = 0; i < config_.threads_; i++){
threads.emplace_back(&Crawler::find_new_files_mt_bfs, this, std::ref(queue), snap_path, std::ref(total_bytes_at), std::ref(threads_running));
threads.emplace_back(&Crawler::find_new_files_mt_bfs, this, std::ref(file_list), std::ref(queue), snap_path, std::ref(total_bytes_at), std::ref(threads_running));
}
for(auto &th : threads) th.join();
total_bytes = total_bytes_at;
}else{
Logging::log.error("Invalid number of worker threads: " + std::to_string(config_.threads_));
l::exit(EXIT_FAILURE);
}
file_list_.shrink_to_fit();
// log list of new files
if(config_.log_level_ >= 2){ // skip loop if not logging
Logging::log.message("Files to sync:",2);
for(auto &i : file_list_){
for(auto &i : file_list){
Logging::log.message(i.path(),2);
}
}
Expand Down Expand Up @@ -204,23 +197,23 @@ bool Crawler::ignore_entry(const File &file) const{
}
}

void Crawler::find_new_files_recursive(fs::path current_path, const fs::path &snap_root, uintmax_t &total_bytes){
void Crawler::find_new_files_recursive(std::vector<File> &file_list, fs::path current_path, const fs::path &snap_root, uintmax_t &total_bytes){
size_t snap_root_len = snap_root.string().length();
for(fs::directory_iterator itr{current_path}; itr != fs::directory_iterator{}; *itr++){
fs::directory_entry entry = *itr;
const char *path = entry.path().c_str();
File file(path, snap_root_len);
if(ignore_entry(file)) continue;
if(file.is_directory()){
find_new_files_recursive(entry.path(), snap_root, total_bytes); // recurse
find_new_files_recursive(file_list, entry.path(), snap_root, total_bytes); // recurse
}else{
total_bytes += file.size();
file_list_.emplace_back(std::move(file));
file_list.emplace_back(std::move(file));
}
}
}

void Crawler::find_new_files_mt_bfs(ConcurrentQueue<fs::path> &queue, const fs::path &snap_root, std::atomic<uintmax_t> &total_bytes, std::atomic<int> &threads_running){
void Crawler::find_new_files_mt_bfs(std::vector<File> &file_list, ConcurrentQueue<fs::path> &queue, const fs::path &snap_root, std::atomic<uintmax_t> &total_bytes, std::atomic<int> &threads_running){
threads_running++;
bool nodes_left = true;
fs::path node;
Expand All @@ -247,7 +240,7 @@ void Crawler::find_new_files_mt_bfs(ConcurrentQueue<fs::path> &queue, const fs::
total_bytes += file.size();
{
std::unique_lock<std::mutex> lk(file_list_mt_);
file_list_.emplace_back(std::move(file));
file_list.emplace_back(std::move(file));
}
}
files_to_enqueue.clear();
Expand Down
13 changes: 3 additions & 10 deletions src/incl/crawler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ class Crawler{
std::mutex file_list_mt_;
/* Make file_list_ thread safe for insertion.
*/
std::vector<File> file_list_;
/* List of files to send to remote backup.
*/
LastRctime last_rctime_;
/* Timestamp of last sync.
*/
Expand All @@ -63,10 +60,6 @@ class Crawler{
~Crawler(void) = default;
/* Default destructor.
*/
void reset(void);
/* Clear file_list_ and set
* payload_bytes_ to 0.
*/
void poll_base(bool seed, bool dry_run, bool set_rctime, bool oneshot);
/* Main loop of program.
* Polls for change in root sync path,
Expand All @@ -77,7 +70,7 @@ class Crawler{
void create_snap(const timespec &rctime);
/* Create snapshot in base directory
*/
void trigger_search(const boost::filesystem::path& snap_path, uintmax_t& total_bytes);
void trigger_search(std::vector<File> &file_list, const boost::filesystem::path& snap_path, uintmax_t& total_bytes);
/* Queues newly modified/created files
* into file_list_, keeps tally of filesize in
* total_bytes.
Expand All @@ -86,12 +79,12 @@ class Crawler{
/* Returns true if file should not be queued or directory should
* not be searched.
*/
void find_new_files_recursive(fs::path current_path, const fs::path &snap_root, uintmax_t &total_bytes);
void find_new_files_recursive(std::vector<File> &file_list, fs::path current_path, const fs::path &snap_root, uintmax_t &total_bytes);
/* Recursive DFS on directory tree to queue files.
* Keeps tally of filesize in total_bytes.
* This is used if threads == 1.
*/
void find_new_files_mt_bfs(ConcurrentQueue<fs::path> &queue, const fs::path &snap_root, std::atomic<uintmax_t> &total_bytes, std::atomic<int> &threads_running);
void find_new_files_mt_bfs(std::vector<File> &file_list, ConcurrentQueue<fs::path> &queue, const fs::path &snap_root, std::atomic<uintmax_t> &total_bytes, std::atomic<int> &threads_running);
/* Worker thread function to do multithreaded BFS on directory tree to queue files.
* Keeps tally of filesize in total_bytes.
* This is used if threads > 1.
Expand Down

0 comments on commit 8319a8e

Please sign in to comment.