diff --git a/src/bucket/BucketManagerImpl.cpp b/src/bucket/BucketManagerImpl.cpp index 8185988cc2..0b11dd2273 100644 --- a/src/bucket/BucketManagerImpl.cpp +++ b/src/bucket/BucketManagerImpl.cpp @@ -972,8 +972,9 @@ BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq) }); mEvictionFuture = task->get_future(); - mApp.postOnBackgroundThread(bind(&task_t::operator(), task), - "SearchableBucketListSnapshot: eviction scan"); + mApp.postOnEvictionBackgroundThread( + bind(&task_t::operator(), task), + "SearchableBucketListSnapshot: eviction scan"); } void @@ -982,13 +983,13 @@ BucketManagerImpl::resolveBackgroundEvictionScan( LedgerKeySet const& modifiedKeys) { ZoneScoped; + releaseAssert(threadIsMain()); if (!mEvictionFuture.valid()) { startBackgroundEvictionScan(ledgerSeq); } - mEvictionFuture.wait(); auto evictionCandidates = mEvictionFuture.get(); auto const& networkConfig = @@ -1000,7 +1001,6 @@ BucketManagerImpl::resolveBackgroundEvictionScan( networkConfig.stateArchivalSettings())) { startBackgroundEvictionScan(ledgerSeq); - mEvictionFuture.wait(); evictionCandidates = mEvictionFuture.get(); } diff --git a/src/bucket/BucketManagerImpl.h b/src/bucket/BucketManagerImpl.h index dbbdc960fe..9c60d460d8 100644 --- a/src/bucket/BucketManagerImpl.h +++ b/src/bucket/BucketManagerImpl.h @@ -62,17 +62,6 @@ class BucketManagerImpl : public BucketManager std::future mEvictionFuture{}; - // Lock for managing raw Bucket files or the bucket directory. This lock is - // only required for file access, but is not required for logical changes to - // the BucketList (i.e. addBatch). - mutable std::recursive_mutex mBucketFileMutex; - - // Lock for logical BucketList changes and snapshots (i.e. addBatch, - // getSearchableSnapshot). This lock is not required for raw Bucket file - // management. - mutable std::recursive_mutex mBucketSnapshotMutex; - - bool const mDeleteEntireBucketDirInDtor; // Records bucket-merges that are currently _live_ in some FutureBucket, in diff --git a/src/main/Application.h b/src/main/Application.h index 33de728d38..b836852a87 100644 --- a/src/main/Application.h +++ b/src/main/Application.h @@ -226,12 +226,18 @@ class Application // this io_context will execute in parallel with the calling thread, so use // with caution. virtual asio::io_context& getWorkerIOContext() = 0; + virtual asio::io_context& getEvictionIOContext() = 0; virtual void postOnMainThread( std::function&& f, std::string&& name, Scheduler::ActionType type = Scheduler::ActionType::NORMAL_ACTION) = 0; + + // While both are lower priority than the main thread, eviction threads have + // more priority than regular worker background threads virtual void postOnBackgroundThread(std::function&& f, std::string jobName) = 0; + virtual void postOnEvictionBackgroundThread(std::function&& f, + std::string jobName) = 0; // Perform actions necessary to transition from BOOTING_STATE to other // states. In particular: either reload or reinitialize the database, and diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index 8d470cdf8d..9a098e707f 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -81,9 +81,20 @@ namespace stellar ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) : mVirtualClock(clock) , mConfig(cfg) - , mWorkerIOContext(mConfig.WORKER_THREADS) + // Allocate one worker to eviction when background eviction enabled + , mWorkerIOContext(mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN + ? mConfig.WORKER_THREADS - 1 + : mConfig.WORKER_THREADS) + , mEvictionIOContext(mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN + ? std::make_optional(1) + : std::nullopt) , mWork(std::make_unique(mWorkerIOContext)) + , mEvictionWork( + mEvictionIOContext + ? std::make_unique(*mEvictionIOContext) + : nullptr) , mWorkerThreads() + , mEvictionThread() , mStopSignals(clock.getIOContext(), SIGINT) , mStarted(false) , mStopping(false) @@ -135,6 +146,21 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) auto t = mConfig.WORKER_THREADS; LOG_DEBUG(DEFAULT_LOG, "Application constructing (worker threads: {})", t); + + if (mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN) + { + releaseAssert(mConfig.WORKER_THREADS > 0); + releaseAssert(mEvictionIOContext); + + // Allocate one thread for Eviction scan + mEvictionThread = std::thread{[this]() { + runCurrentThreadWithMediumPriority(); + mEvictionIOContext->run(); + }}; + + --t; + } + while (t--) { auto thread = std::thread{[this]() { @@ -760,12 +786,21 @@ ApplicationImpl::validateAndLogConfig() "stellar-core new-db."); } - if (mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN && - !mConfig.isUsingBucketListDB()) + if (mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN) { - throw std::invalid_argument( - "EXPERIMENTAL_BUCKETLIST_DB must be enabled to use " - "EXPERIMENTAL_BACKGROUND_EVICTION_SCAN"); + if (!mConfig.isUsingBucketListDB()) + { + throw std::invalid_argument( + "EXPERIMENTAL_BUCKETLIST_DB must be enabled to use " + "EXPERIMENTAL_BACKGROUND_EVICTION_SCAN"); + } + + if (mConfig.WORKER_THREADS < 2) + { + throw std::invalid_argument( + "EXPERIMENTAL_BACKGROUND_EVICTION_SCAN requires " + "WORKER_THREADS > 1"); + } } if (isNetworkedValidator && mConfig.isInMemoryMode()) @@ -917,6 +952,18 @@ ApplicationImpl::joinAllThreads() { w.join(); } + + if (mEvictionWork) + { + mEvictionWork.reset(); + } + + if (mEvictionThread) + { + LOG_DEBUG(DEFAULT_LOG, "Joining eviction thread"); + mEvictionThread->join(); + } + LOG_DEBUG(DEFAULT_LOG, "Joined all {} threads", mWorkerThreads.size()); } @@ -1386,6 +1433,13 @@ ApplicationImpl::getWorkerIOContext() return mWorkerIOContext; } +asio::io_context& +ApplicationImpl::getEvictionIOContext() +{ + releaseAssert(mEvictionIOContext); + return *mEvictionIOContext; +} + void ApplicationImpl::postOnMainThread(std::function&& f, std::string&& name, Scheduler::ActionType type) @@ -1418,6 +1472,18 @@ ApplicationImpl::postOnBackgroundThread(std::function&& f, }); } +void +ApplicationImpl::postOnEvictionBackgroundThread(std::function&& f, + std::string jobName) +{ + LogSlowExecution isSlow{std::move(jobName), LogSlowExecution::Mode::MANUAL, + "executed after"}; + asio::post(getEvictionIOContext(), [this, f = std::move(f), isSlow]() { + mPostOnBackgroundThreadDelay.Update(isSlow.checkElapsedTime()); + f(); + }); +} + void ApplicationImpl::enableInvariantsFromConfig() { diff --git a/src/main/ApplicationImpl.h b/src/main/ApplicationImpl.h index d1edcab341..f99a97e5c6 100644 --- a/src/main/ApplicationImpl.h +++ b/src/main/ApplicationImpl.h @@ -78,10 +78,13 @@ class ApplicationImpl : public Application virtual StatusManager& getStatusManager() override; virtual asio::io_context& getWorkerIOContext() override; + virtual asio::io_context& getEvictionIOContext() override; virtual void postOnMainThread(std::function&& f, std::string&& name, Scheduler::ActionType type) override; virtual void postOnBackgroundThread(std::function&& f, std::string jobName) override; + virtual void postOnEvictionBackgroundThread(std::function&& f, + std::string jobName) override; virtual void start() override; void startServices(); @@ -147,7 +150,9 @@ class ApplicationImpl : public Application // subsystems. asio::io_context mWorkerIOContext; + std::optional mEvictionIOContext; std::unique_ptr mWork; + std::unique_ptr mEvictionWork; std::unique_ptr mBucketManager; std::unique_ptr mDatabase; @@ -187,6 +192,12 @@ class ApplicationImpl : public Application std::vector mWorkerThreads; + // Unlike mWorkerThreads (which are low priority), eviction scans require a + // medium priority thread. In the future, this may become a more general + // higher-priority worker thread type, but for now we only need a single + // thread for eviction scans. + std::optional mEvictionThread; + asio::signal_set mStopSignals; bool mStarted; diff --git a/src/main/Config.cpp b/src/main/Config.cpp index abbebb61b5..a71efc14f7 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -1299,7 +1299,7 @@ Config::processConfig(std::shared_ptr t) } else if (item.first == "WORKER_THREADS") { - WORKER_THREADS = readInt(item, 1, 1000); + WORKER_THREADS = readInt(item, 2, 1000); } else if (item.first == "MAX_CONCURRENT_SUBPROCESSES") { diff --git a/src/test/FuzzerImpl.cpp b/src/test/FuzzerImpl.cpp index c633d2d487..f6e5bc5459 100644 --- a/src/test/FuzzerImpl.cpp +++ b/src/test/FuzzerImpl.cpp @@ -867,7 +867,7 @@ getFuzzConfig(int instanceNumber) cfg.ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING = false; cfg.ARTIFICIALLY_SET_CLOSE_TIME_FOR_TESTING = UINT32_MAX; cfg.HTTP_PORT = 0; - cfg.WORKER_THREADS = 1; + cfg.WORKER_THREADS = 2; cfg.QUORUM_INTERSECTION_CHECKER = false; cfg.PREFERRED_PEERS_ONLY = false; cfg.RUN_STANDALONE = true; diff --git a/src/util/Thread.cpp b/src/util/Thread.cpp index eeecaeaa06..4a58c9485e 100644 --- a/src/util/Thread.cpp +++ b/src/util/Thread.cpp @@ -19,11 +19,11 @@ namespace stellar #if defined(_WIN32) -void -runCurrentThreadWithLowPriority() +static void +runCurrentThreadWithPriority(int priority) { HANDLE curThread = ::GetCurrentThread(); - BOOL ret = ::SetThreadPriority(curThread, THREAD_PRIORITY_BELOW_NORMAL); + BOOL ret = ::SetThreadPriority(curThread, priority); if (!ret) { @@ -31,31 +31,52 @@ runCurrentThreadWithLowPriority() } } -#elif defined(__linux__) - void runCurrentThreadWithLowPriority() { - constexpr auto const LOW_PRIORITY_NICE = 5; + runCurrentThreadWithPriority(THREAD_PRIORITY_LOWEST); +} + +void +runCurrentThreadWithMediumPriority() +{ + runCurrentThreadWithPriority(THREAD_PRIORITY_BELOW_NORMAL); +} - auto newNice = nice(LOW_PRIORITY_NICE); - if (newNice != LOW_PRIORITY_NICE) +#elif defined(__linux__) + +static void +runCurrentThreadWithPriority(int priority) +{ + auto newNice = nice(priority); + if (newNice != priority) { LOG_DEBUG(DEFAULT_LOG, "Unable to run worker thread with low priority. " "Normal priority will be used."); } } -#elif defined(__APPLE__) - void runCurrentThreadWithLowPriority() +{ + runCurrentThreadWithPriority(/*LOW_PRIORITY_NICE*/ 5); +} + +void +runCurrentThreadWithMediumPriority() +{ + runCurrentThreadWithPriority(/*MED_PRIORITY_NICE*/ 3); +} + +#elif defined(__APPLE__) + +static void +runCurrentThreadWithPriority(int priority) { // Default MacOS priority is 31 in a user-mode band from 0..63, niceing (or // other priority-adjustment) usually subtracts from there. Range is +/- 16, // with lower meaning lower (i.e. UTILITY class is 20). The standard // pthreads API works for adjusting a single thread's priority. - constexpr auto const LOW_PRIORITY_NICE = 5; struct sched_param sp; int policy; int ret = pthread_getschedparam(pthread_self(), &policy, &sp); @@ -63,7 +84,7 @@ runCurrentThreadWithLowPriority() { LOG_DEBUG(DEFAULT_LOG, "Unable to get priority for thread: {}", ret); } - sp.sched_priority -= LOW_PRIORITY_NICE; + sp.sched_priority -= priority; ret = pthread_setschedparam(pthread_self(), policy, &sp); if (ret != 0) { @@ -71,6 +92,17 @@ runCurrentThreadWithLowPriority() } } +void +runCurrentThreadWithLowPriority() +{ + runCurrentThreadWithPriority(/*LOW_PRIORITY_NICE*/ 5); +} + +void +runCurrentThreadWithMediumPriority() +{ + runCurrentThreadWithPriority(/*MED_PRIORITY_NICE*/ 3); +} #else void @@ -78,5 +110,10 @@ runCurrentThreadWithLowPriority() { } +void +runCurrentThreadWithMediumPriority() +{ +} + #endif } diff --git a/src/util/Thread.h b/src/util/Thread.h index c3e5b28b0d..fb0d886e0c 100644 --- a/src/util/Thread.h +++ b/src/util/Thread.h @@ -12,6 +12,7 @@ namespace stellar { void runCurrentThreadWithLowPriority(); +void runCurrentThreadWithMediumPriority(); template bool diff --git a/test-tx-meta-baseline-current/InvokeHostFunctionTests.json b/test-tx-meta-baseline-current/InvokeHostFunctionTests.json index 2e4206dec0..a7b26749b8 100644 --- a/test-tx-meta-baseline-current/InvokeHostFunctionTests.json +++ b/test-tx-meta-baseline-current/InvokeHostFunctionTests.json @@ -1369,6 +1369,6 @@ "bKDF6V5IzTo=", "bKDF6V5IzTo=" ], - "temp entry eviction" : [ "bKDF6V5IzTo=", "bKDF6V5IzTo=" ], + "temp entry eviction|sql" : [ "bKDF6V5IzTo=", "bKDF6V5IzTo=" ], "transaction validation diagnostics" : [ "bKDF6V5IzTo=" ] } diff --git a/test-tx-meta-baseline-next/InvokeHostFunctionTests.json b/test-tx-meta-baseline-next/InvokeHostFunctionTests.json index 2d628a6b51..ec03383576 100644 --- a/test-tx-meta-baseline-next/InvokeHostFunctionTests.json +++ b/test-tx-meta-baseline-next/InvokeHostFunctionTests.json @@ -1373,8 +1373,6 @@ "bKDF6V5IzTo=", "bKDF6V5IzTo=" ], - "temp entry eviction" : [ "bKDF6V5IzTo=", "bKDF6V5IzTo=" ], - "transaction validation diagnostics" : [ "bKDF6V5IzTo=" ], - "version test" : [ "766L+TYsWqA=" ], - "version test acceptance" : [ "766L+TYsWqA=" ] + "temp entry eviction|sql" : [ "bKDF6V5IzTo=", "bKDF6V5IzTo=" ], + "transaction validation diagnostics" : [ "bKDF6V5IzTo=" ] }