Skip to content

Commit

Permalink
lock-free prefetch enqueue (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin authored Jan 24, 2021
1 parent 38119b5 commit 06742c5
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 4 deletions.
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ FetchContent_Declare(
FetchContent_MakeAvailable(sqlitecpp)
include_directories(${sqlitecpp_SOURCE_DIR}/include)

FetchContent_Declare(
concurrentqueue
GIT_REPOSITORY https://github.com/cameron314/readerwriterqueue.git
GIT_TAG v1.0.3
)
FetchContent_MakeAvailable(concurrentqueue)
include_directories(${concurrentqueue_SOURCE_DIR})

project(sqlite_nested_vfs VERSION 1.0
DESCRIPTION "SQLite VFS extension storing database pages in...a SQLite database"
LANGUAGES C CXX)
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN apt-get -qq update && \
ADD . /work
WORKDIR /work

RUN cmake -DCMAKE_BUILD_TYPE=$build_type . -B build && cmake --build build -j $(nproc)
RUN rm -rf build && cmake -DCMAKE_BUILD_TYPE=$build_type . -B build && cmake --build build -j $(nproc)

WORKDIR /work/build
CMD ctest -V
4 changes: 2 additions & 2 deletions src/SQLiteNestedVFS.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {

const size_t MAX_FETCH_CURSORS = 4;
std::vector<std::unique_ptr<FetchJob>> fetch_jobs_;
ThreadPool fetch_thread_pool_;
ThreadPoolWithEnqueueFast fetch_thread_pool_;
std::mutex seek_lock_; // serializes outer db interactions among fetch background threads
std::atomic<bool> seek_interrupt_; // broadcast that main thread wants seek_lock_

Expand Down Expand Up @@ -445,7 +445,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
if (active_jobs + 2 <= fetch_thread_pool_.MaxThreads()) {
job->pageno = pageno_hint;
job->PutState(FetchJob::State::QUEUE);
fetch_thread_pool_.Enqueue(
fetch_thread_pool_.EnqueueFast(
job, [this](void *job) { return this->BackgroundFetchJob(job); }, nullptr);
} else {
assert(active_jobs + 1 == fetch_thread_pool_.MaxThreads());
Expand Down
57 changes: 56 additions & 1 deletion src/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <functional>
#include <mutex>
#include <queue>
#include <readerwriterqueue.h>
#include <thread>

namespace SQLiteNested {
Expand Down Expand Up @@ -88,7 +89,7 @@ class ThreadPool {
public:
ThreadPool(size_t max_threads, size_t max_jobs)
: max_threads_(max_threads), max_jobs_(max_jobs), ser_queue_(job_greater_) {}
~ThreadPool() {
virtual ~ThreadPool() {
{
std::lock_guard<std::mutex> lock(mutex_);
shutdown_ = true;
Expand Down Expand Up @@ -140,4 +141,58 @@ class ThreadPool {
}
};

// Adds lock-free EnqueueFast() for use on critical paths.
// - EnqueueFast() never blocks (!)
// - Only one thread should ever use it
class ThreadPoolWithEnqueueFast : public ThreadPool {
// concept: foreground thread adds job onto a lock-free queue, which a single background thread
// consumes to Enqueue()

struct EnqueueFastJob {
bool shutdown = false;
void *x = nullptr;
std::function<void *(void *) noexcept> par;
std::function<void(void *) noexcept> ser;
};

moodycamel::BlockingReaderWriterQueue<EnqueueFastJob> fast_queue_;
std::unique_ptr<std::thread> worker_thread_;

void EnqueueFastWorker() {
EnqueueFastJob job;
while (true) {
fast_queue_.wait_dequeue(job);
if (job.shutdown) {
break;
}
this->Enqueue(job.x, job.par, job.ser);
}
}

public:
ThreadPoolWithEnqueueFast(size_t max_threads, size_t max_jobs)
: ThreadPool(max_threads, max_jobs), fast_queue_(max_jobs) {}

~ThreadPoolWithEnqueueFast() {
if (worker_thread_) {
EnqueueFastJob job;
job.shutdown = true;
fast_queue_.enqueue(job);
worker_thread_->join();
}
}

void EnqueueFast(void *x, std::function<void *(void *) noexcept> par,
std::function<void(void *) noexcept> ser) {
EnqueueFastJob job;
job.x = x;
job.par = par;
job.ser = ser;
fast_queue_.enqueue(job);
if (!worker_thread_) {
worker_thread_.reset(new std::thread([this]() { this->EnqueueFastWorker(); }));
}
}
};

} // namespace SQLiteNested

0 comments on commit 06742c5

Please sign in to comment.