Skip to content

Commit

Permalink
Added medium priority threads and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Apr 10, 2024
1 parent 15fccc9 commit c9b2cbe
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 37 deletions.
8 changes: 4 additions & 4 deletions src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,9 @@ BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq)
});

mEvictionFuture = task->get_future();
mApp.postOnBackgroundThread(bind(&task_t::operator(), task),
"SearchableBucketListSnapshot: eviction scan");
mApp.postOnEvictionBackgroundThread(
bind(&task_t::operator(), task),
"SearchableBucketListSnapshot: eviction scan");
}

void
Expand All @@ -982,13 +983,13 @@ BucketManagerImpl::resolveBackgroundEvictionScan(
LedgerKeySet const& modifiedKeys)
{
ZoneScoped;
releaseAssert(threadIsMain());

if (!mEvictionFuture.valid())
{
startBackgroundEvictionScan(ledgerSeq);
}

mEvictionFuture.wait();
auto evictionCandidates = mEvictionFuture.get();

auto const& networkConfig =
Expand All @@ -1000,7 +1001,6 @@ BucketManagerImpl::resolveBackgroundEvictionScan(
networkConfig.stateArchivalSettings()))
{
startBackgroundEvictionScan(ledgerSeq);
mEvictionFuture.wait();
evictionCandidates = mEvictionFuture.get();
}

Expand Down
11 changes: 0 additions & 11 deletions src/bucket/BucketManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,6 @@ class BucketManagerImpl : public BucketManager

std::future<EvictionResult> mEvictionFuture{};

// Lock for managing raw Bucket files or the bucket directory. This lock is
// only required for file access, but is not required for logical changes to
// the BucketList (i.e. addBatch).
mutable std::recursive_mutex mBucketFileMutex;

// Lock for logical BucketList changes and snapshots (i.e. addBatch,
// getSearchableSnapshot). This lock is not required for raw Bucket file
// management.
mutable std::recursive_mutex mBucketSnapshotMutex;


bool const mDeleteEntireBucketDirInDtor;

// Records bucket-merges that are currently _live_ in some FutureBucket, in
Expand Down
6 changes: 6 additions & 0 deletions src/main/Application.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,18 @@ class Application
// this io_context will execute in parallel with the calling thread, so use
// with caution.
virtual asio::io_context& getWorkerIOContext() = 0;
virtual asio::io_context& getEvictionIOContext() = 0;

virtual void postOnMainThread(
std::function<void()>&& f, std::string&& name,
Scheduler::ActionType type = Scheduler::ActionType::NORMAL_ACTION) = 0;

// While both are lower priority than the main thread, eviction threads have
// more priority than regular worker background threads
virtual void postOnBackgroundThread(std::function<void()>&& f,
std::string jobName) = 0;
virtual void postOnEvictionBackgroundThread(std::function<void()>&& f,
std::string jobName) = 0;

// Perform actions necessary to transition from BOOTING_STATE to other
// states. In particular: either reload or reinitialize the database, and
Expand Down
78 changes: 72 additions & 6 deletions src/main/ApplicationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,20 @@ namespace stellar
ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg)
: mVirtualClock(clock)
, mConfig(cfg)
, mWorkerIOContext(mConfig.WORKER_THREADS)
// Allocate one worker to eviction when background eviction enabled
, mWorkerIOContext(mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN
? mConfig.WORKER_THREADS - 1
: mConfig.WORKER_THREADS)
, mEvictionIOContext(mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN
? std::make_optional<asio::io_context>(1)
: std::nullopt)
, mWork(std::make_unique<asio::io_context::work>(mWorkerIOContext))
, mEvictionWork(
mEvictionIOContext
? std::make_unique<asio::io_context::work>(*mEvictionIOContext)
: nullptr)
, mWorkerThreads()
, mEvictionThread()
, mStopSignals(clock.getIOContext(), SIGINT)
, mStarted(false)
, mStopping(false)
Expand Down Expand Up @@ -135,6 +146,21 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg)

auto t = mConfig.WORKER_THREADS;
LOG_DEBUG(DEFAULT_LOG, "Application constructing (worker threads: {})", t);

if (mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN)
{
releaseAssert(mConfig.WORKER_THREADS > 0);
releaseAssert(mEvictionIOContext);

// Allocate one thread for Eviction scan
mEvictionThread = std::thread{[this]() {
runCurrentThreadWithMediumPriority();
mEvictionIOContext->run();
}};

--t;
}

while (t--)
{
auto thread = std::thread{[this]() {
Expand Down Expand Up @@ -760,12 +786,21 @@ ApplicationImpl::validateAndLogConfig()
"stellar-core new-db.");
}

if (mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN &&
!mConfig.isUsingBucketListDB())
if (mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN)
{
throw std::invalid_argument(
"EXPERIMENTAL_BUCKETLIST_DB must be enabled to use "
"EXPERIMENTAL_BACKGROUND_EVICTION_SCAN");
if (!mConfig.isUsingBucketListDB())
{
throw std::invalid_argument(
"EXPERIMENTAL_BUCKETLIST_DB must be enabled to use "
"EXPERIMENTAL_BACKGROUND_EVICTION_SCAN");
}

if (mConfig.WORKER_THREADS < 2)
{
throw std::invalid_argument(
"EXPERIMENTAL_BACKGROUND_EVICTION_SCAN requires "
"WORKER_THREADS > 1");
}
}

if (isNetworkedValidator && mConfig.isInMemoryMode())
Expand Down Expand Up @@ -917,6 +952,18 @@ ApplicationImpl::joinAllThreads()
{
w.join();
}

if (mEvictionWork)
{
mEvictionWork.reset();
}

if (mEvictionThread)
{
LOG_DEBUG(DEFAULT_LOG, "Joining eviction thread");
mEvictionThread->join();
}

LOG_DEBUG(DEFAULT_LOG, "Joined all {} threads", mWorkerThreads.size());
}

Expand Down Expand Up @@ -1386,6 +1433,13 @@ ApplicationImpl::getWorkerIOContext()
return mWorkerIOContext;
}

asio::io_context&
ApplicationImpl::getEvictionIOContext()
{
releaseAssert(mEvictionIOContext);
return *mEvictionIOContext;
}

void
ApplicationImpl::postOnMainThread(std::function<void()>&& f, std::string&& name,
Scheduler::ActionType type)
Expand Down Expand Up @@ -1418,6 +1472,18 @@ ApplicationImpl::postOnBackgroundThread(std::function<void()>&& f,
});
}

void
ApplicationImpl::postOnEvictionBackgroundThread(std::function<void()>&& f,
std::string jobName)
{
LogSlowExecution isSlow{std::move(jobName), LogSlowExecution::Mode::MANUAL,
"executed after"};
asio::post(getEvictionIOContext(), [this, f = std::move(f), isSlow]() {
mPostOnBackgroundThreadDelay.Update(isSlow.checkElapsedTime());
f();
});
}

void
ApplicationImpl::enableInvariantsFromConfig()
{
Expand Down
11 changes: 11 additions & 0 deletions src/main/ApplicationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ class ApplicationImpl : public Application
virtual StatusManager& getStatusManager() override;

virtual asio::io_context& getWorkerIOContext() override;
virtual asio::io_context& getEvictionIOContext() override;
virtual void postOnMainThread(std::function<void()>&& f, std::string&& name,
Scheduler::ActionType type) override;
virtual void postOnBackgroundThread(std::function<void()>&& f,
std::string jobName) override;
virtual void postOnEvictionBackgroundThread(std::function<void()>&& f,
std::string jobName) override;

virtual void start() override;
void startServices();
Expand Down Expand Up @@ -147,7 +150,9 @@ class ApplicationImpl : public Application
// subsystems.

asio::io_context mWorkerIOContext;
std::optional<asio::io_context> mEvictionIOContext;
std::unique_ptr<asio::io_context::work> mWork;
std::unique_ptr<asio::io_context::work> mEvictionWork;

std::unique_ptr<BucketManager> mBucketManager;
std::unique_ptr<Database> mDatabase;
Expand Down Expand Up @@ -187,6 +192,12 @@ class ApplicationImpl : public Application

std::vector<std::thread> mWorkerThreads;

// Unlike mWorkerThreads (which are low priority), eviction scans require a
// medium priority thread. In the future, this may become a more general
// higher-priority worker thread type, but for now we only need a single
// thread for eviction scans.
std::optional<std::thread> mEvictionThread;

asio::signal_set mStopSignals;

bool mStarted;
Expand Down
2 changes: 1 addition & 1 deletion src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ Config::processConfig(std::shared_ptr<cpptoml::table> t)
}
else if (item.first == "WORKER_THREADS")
{
WORKER_THREADS = readInt<int>(item, 1, 1000);
WORKER_THREADS = readInt<int>(item, 2, 1000);
}
else if (item.first == "MAX_CONCURRENT_SUBPROCESSES")
{
Expand Down
2 changes: 1 addition & 1 deletion src/test/FuzzerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ getFuzzConfig(int instanceNumber)
cfg.ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING = false;
cfg.ARTIFICIALLY_SET_CLOSE_TIME_FOR_TESTING = UINT32_MAX;
cfg.HTTP_PORT = 0;
cfg.WORKER_THREADS = 1;
cfg.WORKER_THREADS = 2;
cfg.QUORUM_INTERSECTION_CHECKER = false;
cfg.PREFERRED_PEERS_ONLY = false;
cfg.RUN_STANDALONE = true;
Expand Down
61 changes: 49 additions & 12 deletions src/util/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,64 +19,101 @@ namespace stellar

#if defined(_WIN32)

void
runCurrentThreadWithLowPriority()
static void
runCurrentThreadWithPriority(int priority)
{
HANDLE curThread = ::GetCurrentThread();
BOOL ret = ::SetThreadPriority(curThread, THREAD_PRIORITY_BELOW_NORMAL);
BOOL ret = ::SetThreadPriority(curThread, priority);

if (!ret)
{
LOG_DEBUG(DEFAULT_LOG, "Unable to set priority for thread: {}", ret);
}
}

#elif defined(__linux__)

void
runCurrentThreadWithLowPriority()
{
constexpr auto const LOW_PRIORITY_NICE = 5;
runCurrentThreadWithPriority(THREAD_PRIORITY_LOWEST);
}

void
runCurrentThreadWithMediumPriority()
{
runCurrentThreadWithPriority(THREAD_PRIORITY_BELOW_NORMAL);
}

auto newNice = nice(LOW_PRIORITY_NICE);
if (newNice != LOW_PRIORITY_NICE)
#elif defined(__linux__)

static void
runCurrentThreadWithPriority(int priority)
{
auto newNice = nice(priority);
if (newNice != priority)
{
LOG_DEBUG(DEFAULT_LOG, "Unable to run worker thread with low priority. "
"Normal priority will be used.");
}
}

#elif defined(__APPLE__)

void
runCurrentThreadWithLowPriority()
{
runCurrentThreadWithPriority(/*LOW_PRIORITY_NICE*/ 5);
}

void
runCurrentThreadWithMediumPriority()
{
runCurrentThreadWithPriority(/*MED_PRIORITY_NICE*/ 3);
}

#elif defined(__APPLE__)

static void
runCurrentThreadWithPriority(int priority)
{
// Default MacOS priority is 31 in a user-mode band from 0..63, niceing (or
// other priority-adjustment) usually subtracts from there. Range is +/- 16,
// with lower meaning lower (i.e. UTILITY class is 20). The standard
// pthreads API works for adjusting a single thread's priority.
constexpr auto const LOW_PRIORITY_NICE = 5;
struct sched_param sp;
int policy;
int ret = pthread_getschedparam(pthread_self(), &policy, &sp);
if (ret != 0)
{
LOG_DEBUG(DEFAULT_LOG, "Unable to get priority for thread: {}", ret);
}
sp.sched_priority -= LOW_PRIORITY_NICE;
sp.sched_priority -= priority;
ret = pthread_setschedparam(pthread_self(), policy, &sp);
if (ret != 0)
{
LOG_DEBUG(DEFAULT_LOG, "Unable to set priority for thread: {}", ret);
}
}

void
runCurrentThreadWithLowPriority()
{
runCurrentThreadWithPriority(/*LOW_PRIORITY_NICE*/ 5);
}

void
runCurrentThreadWithMediumPriority()
{
runCurrentThreadWithPriority(/*MED_PRIORITY_NICE*/ 3);
}
#else

void
runCurrentThreadWithLowPriority()
{
}

void
runCurrentThreadWithMediumPriority()
{
}

#endif
}
1 change: 1 addition & 0 deletions src/util/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace stellar
{

void runCurrentThreadWithLowPriority();
void runCurrentThreadWithMediumPriority();

template <typename T>
bool
Expand Down
2 changes: 1 addition & 1 deletion test-tx-meta-baseline-current/InvokeHostFunctionTests.json
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,6 @@
"bKDF6V5IzTo=",
"bKDF6V5IzTo="
],
"temp entry eviction" : [ "bKDF6V5IzTo=", "bKDF6V5IzTo=" ],
"temp entry eviction|sql" : [ "bKDF6V5IzTo=", "bKDF6V5IzTo=" ],
"transaction validation diagnostics" : [ "bKDF6V5IzTo=" ]
}
2 changes: 1 addition & 1 deletion test-tx-meta-baseline-next/InvokeHostFunctionTests.json
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,6 @@
"bKDF6V5IzTo=",
"bKDF6V5IzTo="
],
"temp entry eviction" : [ "bKDF6V5IzTo=", "bKDF6V5IzTo=" ],
"temp entry eviction|sql" : [ "bKDF6V5IzTo=", "bKDF6V5IzTo=" ],
"transaction validation diagnostics" : [ "bKDF6V5IzTo=" ]
}

0 comments on commit c9b2cbe

Please sign in to comment.