diff --git a/CMakeLists.txt b/CMakeLists.txt index 5ce5b62..899a69d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,6 +25,14 @@ FetchContent_Declare( FetchContent_MakeAvailable(sqlitecpp) include_directories(${sqlitecpp_SOURCE_DIR}/include) +FetchContent_Declare( + concurrentqueue + GIT_REPOSITORY https://github.com/cameron314/readerwriterqueue.git + GIT_TAG v1.0.3 +) +FetchContent_MakeAvailable(concurrentqueue) +include_directories(${concurrentqueue_SOURCE_DIR}) + project(sqlite_nested_vfs VERSION 1.0 DESCRIPTION "SQLite VFS extension storing database pages in...a SQLite database" LANGUAGES C CXX) diff --git a/Dockerfile b/Dockerfile index 66ed6b0..8cb0dad 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN apt-get -qq update && \ ADD . /work WORKDIR /work -RUN cmake -DCMAKE_BUILD_TYPE=$build_type . -B build && cmake --build build -j $(nproc) +RUN rm -rf build && cmake -DCMAKE_BUILD_TYPE=$build_type . -B build && cmake --build build -j $(nproc) WORKDIR /work/build CMD ctest -V diff --git a/src/SQLiteNestedVFS.h b/src/SQLiteNestedVFS.h index 275d339..d25d888 100644 --- a/src/SQLiteNestedVFS.h +++ b/src/SQLiteNestedVFS.h @@ -296,7 +296,7 @@ class InnerDatabaseFile : public SQLiteVFS::File { const size_t MAX_FETCH_CURSORS = 4; std::vector> fetch_jobs_; - ThreadPool fetch_thread_pool_; + ThreadPoolWithEnqueueFast fetch_thread_pool_; std::mutex seek_lock_; // serializes outer db interactions among fetch background threads std::atomic seek_interrupt_; // broadcast that main thread wants seek_lock_ @@ -445,7 +445,7 @@ class InnerDatabaseFile : public SQLiteVFS::File { if (active_jobs + 2 <= fetch_thread_pool_.MaxThreads()) { job->pageno = pageno_hint; job->PutState(FetchJob::State::QUEUE); - fetch_thread_pool_.Enqueue( + fetch_thread_pool_.EnqueueFast( job, [this](void *job) { return this->BackgroundFetchJob(job); }, nullptr); } else { assert(active_jobs + 1 == fetch_thread_pool_.MaxThreads()); diff --git a/src/ThreadPool.h b/src/ThreadPool.h index 6dd166d..c5896e8 100644 --- a/src/ThreadPool.h +++ b/src/ThreadPool.h @@ -13,6 +13,7 @@ #include #include #include +#include #include namespace SQLiteNested { @@ -88,7 +89,7 @@ class ThreadPool { public: ThreadPool(size_t max_threads, size_t max_jobs) : max_threads_(max_threads), max_jobs_(max_jobs), ser_queue_(job_greater_) {} - ~ThreadPool() { + virtual ~ThreadPool() { { std::lock_guard lock(mutex_); shutdown_ = true; @@ -140,4 +141,58 @@ class ThreadPool { } }; +// Adds lock-free EnqueueFast() for use on critical paths. +// - EnqueueFast() never blocks (!) +// - Only one thread should ever use it +class ThreadPoolWithEnqueueFast : public ThreadPool { + // concept: foreground thread adds job onto a lock-free queue, which a single background thread + // consumes to Enqueue() + + struct EnqueueFastJob { + bool shutdown = false; + void *x = nullptr; + std::function par; + std::function ser; + }; + + moodycamel::BlockingReaderWriterQueue fast_queue_; + std::unique_ptr worker_thread_; + + void EnqueueFastWorker() { + EnqueueFastJob job; + while (true) { + fast_queue_.wait_dequeue(job); + if (job.shutdown) { + break; + } + this->Enqueue(job.x, job.par, job.ser); + } + } + + public: + ThreadPoolWithEnqueueFast(size_t max_threads, size_t max_jobs) + : ThreadPool(max_threads, max_jobs), fast_queue_(max_jobs) {} + + ~ThreadPoolWithEnqueueFast() { + if (worker_thread_) { + EnqueueFastJob job; + job.shutdown = true; + fast_queue_.enqueue(job); + worker_thread_->join(); + } + } + + void EnqueueFast(void *x, std::function par, + std::function ser) { + EnqueueFastJob job; + job.x = x; + job.par = par; + job.ser = ser; + fast_queue_.enqueue(job); + if (!worker_thread_) { + worker_thread_.reset(new std::thread([this]() { this->EnqueueFastWorker(); })); + } + } +}; + } // namespace SQLiteNested