Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Change recursive_mutex to mutex in DatabaseRotatingImp #5276

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions Builds/levelization/results/ordering.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ test.app > xrpl.basics
test.app > xrpld.app
test.app > xrpld.core
test.app > xrpld.ledger
test.app > xrpld.nodestore
test.app > xrpld.overlay
test.app > xrpld.rpc
test.app > xrpl.json
Expand Down
136 changes: 136 additions & 0 deletions src/test/app/SHAMapStore_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
#include <test/jtx.h>
#include <test/jtx/envconfig.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/app/main/NodeStoreScheduler.h>
#include <xrpld/app/misc/SHAMapStore.h>
#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
#include <xrpld/core/ConfigSections.h>
#include <xrpld/nodestore/detail/DatabaseRotatingImp.h>
#include <xrpl/protocol/jss.h>
#include <boost/lexical_cast.hpp>
#include <thread>

namespace ripple {
namespace test {
Expand Down Expand Up @@ -518,12 +522,144 @@ class SHAMapStore_test : public beast::unit_test::suite
lastRotated = ledgerSeq - 1;
}

std::unique_ptr<NodeStore::Backend>
makeBackendRotating(
jtx::Env& env,
NodeStoreScheduler& scheduler,
std::string path)
{
Section section{
env.app().config().section(ConfigSection::nodeDatabase())};
boost::filesystem::path newPath;

if (!BEAST_EXPECT(path.size()))
return {};
newPath = path;
section.set("path", newPath.string());

auto backend{NodeStore::Manager::instance().make_Backend(
section,
megabytes(env.app().config().getValueFor(
SizedItem::burstSize, std::nullopt)),
scheduler,
env.app().logs().journal("NodeStoreTest"))};
backend->open();
return backend;
}
void
testRotateWithLockContention()
{
// The only purpose of this test is to ensure that if something that
// should never happen happens, we don't get a deadlock.
testcase("rotate with lock contention");

using namespace jtx;
Env env(*this, envconfig(onlineDelete));

/////////////////////////////////////////////////////////////
// Create the backend. Normally, SHAMapStoreImp handles all these
// details
auto nscfg = env.app().config().section(ConfigSection::nodeDatabase());
nscfg.set(
NodeStore::DatabaseRotatingImp::unitTestFlag, std::to_string(true));

// Provide default values:
if (!nscfg.exists("cache_size"))
nscfg.set(
"cache_size",
std::to_string(env.app().config().getValueFor(
SizedItem::treeCacheSize, std::nullopt)));

if (!nscfg.exists("cache_age"))
nscfg.set(
"cache_age",
std::to_string(env.app().config().getValueFor(
SizedItem::treeCacheAge, std::nullopt)));

NodeStoreScheduler scheduler(env.app().getJobQueue());

std::string const writableDb = "write";
std::string const archiveDb = "archive";
auto writableBackend = makeBackendRotating(env, scheduler, writableDb);
auto archiveBackend = makeBackendRotating(env, scheduler, archiveDb);

// Create NodeStore with two backends to allow online deletion of
// data
constexpr int readThreads = 4;
auto dbr = std::make_unique<NodeStore::DatabaseRotatingImp>(
scheduler,
readThreads,
std::move(writableBackend),
std::move(archiveBackend),
nscfg,
env.app().logs().journal("NodeStoreTest"));

/////////////////////////////////////////////////////////////
// Create the impossible situation. Get several calls to rotateWithLock
// going in parallel using a callback that just delays
using namespace std::chrono_literals;
std::atomic<int> threadNum = 0;
auto const cb = [&](std::string const& writableBackendName) {
using namespace std::chrono_literals;
BEAST_EXPECT(writableBackendName == "write");
auto newBackend = makeBackendRotating(
env, scheduler, std::to_string(++threadNum));
std::this_thread::sleep_for(5s);
return newBackend;
};

std::atomic<int> successes = 0;
std::atomic<int> failures = 0;
std::vector<std::thread> threads;
threads.reserve(5);
for (int i = 0; i < 5; ++i)
{
threads.emplace_back([&]() {
if (dbr->rotateWithLock(cb))
++successes;
else
++failures;
});
// There's no point in trying to time the threads to line up at
// exact points, but introduce a little bit of staggering to be more
// "realistic".
std::this_thread::sleep_for(10ms * i);
}
for (auto& t : threads)
{
t.join();
}
BEAST_EXPECT(successes == 1);
BEAST_EXPECT(failures == 4);
// Only one thread will invoke the callback to increment threadNum
BEAST_EXPECT(threadNum == 1);
BEAST_EXPECT(dbr->getName() == "1");

/////////////////////////////////////////////////////////////
// Create another impossible situation. Try to re-enter rotateWithLock
// inside the callback.
auto const cbReentrant = [&](std::string const& writableBackendName) {
BEAST_EXPECT(writableBackendName == "1");
auto newBackend = makeBackendRotating(
env, scheduler, std::to_string(++threadNum));
BEAST_EXPECT(!dbr->rotateWithLock(cb));
return newBackend;
};
BEAST_EXPECT(dbr->rotateWithLock(cbReentrant));

BEAST_EXPECT(successes == 1);
BEAST_EXPECT(failures == 4);
BEAST_EXPECT(threadNum == 2);
BEAST_EXPECT(dbr->getName() == "2");
}

void
run() override
{
testClear();
testAutomatic();
testCanDelete();
testRotateWithLockContention();
}
};

Expand Down
33 changes: 21 additions & 12 deletions src/xrpld/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,18 +366,27 @@ SHAMapStoreImp::run()

lastRotated = validatedSeq;

dbRotating_->rotateWithLock(
[&](std::string const& writableBackendName) {
SavedState savedState;
savedState.writableDb = newBackend->getName();
savedState.archiveDb = writableBackendName;
savedState.lastRotated = lastRotated;
state_db_.setState(savedState);

clearCaches(validatedSeq);

return std::move(newBackend);
});
if (!dbRotating_->rotateWithLock(
[&](std::string const& writableBackendName) {
SavedState savedState;
savedState.writableDb = newBackend->getName();
savedState.archiveDb = writableBackendName;
savedState.lastRotated = lastRotated;
state_db_.setState(savedState);

clearCaches(validatedSeq);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 363 the clearCaches(validatedSeq) is already called - is calling it here again needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 363 the clearCaches(validatedSeq) is already called - is calling it here again needed?

I believe so because some time could have passed between the two calls, and this helps clear out any entries that were added in that time.


return std::move(newBackend);
vvysokikh1 marked this conversation as resolved.
Show resolved Hide resolved
}))
{
UNREACHABLE(
"ripple::SHAMapStoreImp::run rotateWithLock failed");
JLOG(journal_.error())
<< validatedSeq
<< " rotation failed. Discard unused new backend.";
newBackend->setDeletePath();
return;
}

JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
}
Expand Down
8 changes: 6 additions & 2 deletions src/xrpld/nodestore/DatabaseRotating.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ class DatabaseRotating : public Database

/** Rotates the backends.
@param f A function executed before the rotation and under the same lock
@param f A function executed before the rotation
@return bool indicating whether the callback "f" was called, and the new
Backend it returned is stored
*/
virtual void
[[nodiscard]]
virtual bool
rotateWithLock(std::function<std::unique_ptr<NodeStore::Backend>(
std::string const& writableBackendName)> const& f) = 0;
};
Expand Down
44 changes: 34 additions & 10 deletions src/xrpld/nodestore/detail/DatabaseRotatingImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <xrpld/nodestore/detail/DatabaseRotatingImp.h>
#include <xrpl/protocol/HashPrefix.h>
#include <boost/thread/locks.hpp>

namespace ripple {
namespace NodeStore {
Expand All @@ -33,45 +34,68 @@ DatabaseRotatingImp::DatabaseRotatingImp(
: DatabaseRotating(scheduler, readThreads, config, j)
, writableBackend_(std::move(writableBackend))
, archiveBackend_(std::move(archiveBackend))
, unitTest_(get<bool>(config, unitTestFlag, false))
{
if (writableBackend_)
fdRequired_ += writableBackend_->fdRequired();
if (archiveBackend_)
fdRequired_ += archiveBackend_->fdRequired();
}

void
[[nodiscard]] bool
DatabaseRotatingImp::rotateWithLock(
std::function<std::unique_ptr<NodeStore::Backend>(
std::string const& writableBackendName)> const& f)
{
std::lock_guard lock(mutex_);
// This function should be the only one taking any kind of unique/write
// lock, and should only be called once at a time by its syncronous caller.
Comment on lines +50 to +51
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the consequences if the synchronous caller calls it multiple times? Can we protect against that, or at least detect it, and would it make sense to do so?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the consequences if the synchronous caller calls it multiple times? Can we protect against that, or at least detect it, and would it make sense to do so?

If it calls them sequentially, then it'll just rotate multiple times, which would be dumb. If it calls them in parallel, all but one will fail. If it calls it recursively through the callback, the recursive call will fail.

I've added test cases that demonstrate all three of those possibilities, though the focus is on the latter two.


// The upgradable lock will NOT block shared locks, but will block other
// upgrade locks and unique/exclusive locks.
boost::upgrade_lock upgradeableLock(mutex_, boost::defer_lock);
if (!upgradeableLock.try_lock())
{
// If anything other than a unit test gets here, something has gone very
// wrong
XRPL_ASSERT(
unitTest_,
"ripple::NodeStore::DatabaseRotatingImp::rotateWithLock "
"unit testing");
return false;
}
auto newBackend = f(writableBackend_->getName());

// boost::upgrade_mutex guarantees that only one thread can have "upgrade
// ownership" at a time, so this is 100% safe, and guaranteed to avoid
// deadlock.
boost::upgrade_to_unique_lock writeLock(upgradeableLock);

archiveBackend_->setDeletePath();
archiveBackend_ = std::move(writableBackend_);
writableBackend_ = std::move(newBackend);

return true;
}

std::string
DatabaseRotatingImp::getName() const
{
std::lock_guard lock(mutex_);
boost::shared_lock lock(mutex_);
return writableBackend_->getName();
}

std::int32_t
DatabaseRotatingImp::getWriteLoad() const
{
std::lock_guard lock(mutex_);
boost::shared_lock lock(mutex_);
return writableBackend_->getWriteLoad();
}

void
DatabaseRotatingImp::importDatabase(Database& source)
{
auto const backend = [&] {
std::lock_guard lock(mutex_);
boost::shared_lock lock(mutex_);
return writableBackend_;
}();

Expand All @@ -81,7 +105,7 @@ DatabaseRotatingImp::importDatabase(Database& source)
void
DatabaseRotatingImp::sync()
{
std::lock_guard lock(mutex_);
boost::shared_lock lock(mutex_);
writableBackend_->sync();
}

Expand All @@ -95,7 +119,7 @@ DatabaseRotatingImp::store(
auto nObj = NodeObject::createObject(type, std::move(data), hash);

auto const backend = [&] {
std::lock_guard lock(mutex_);
boost::shared_lock lock(mutex_);
return writableBackend_;
}();

Expand Down Expand Up @@ -149,7 +173,7 @@ DatabaseRotatingImp::fetchNodeObject(
std::shared_ptr<NodeObject> nodeObject;

auto [writable, archive] = [&] {
std::lock_guard lock(mutex_);
boost::shared_lock lock(mutex_);
return std::make_pair(writableBackend_, archiveBackend_);
}();

Expand All @@ -163,7 +187,7 @@ DatabaseRotatingImp::fetchNodeObject(
{
{
// Refresh the writable backend pointer
std::lock_guard lock(mutex_);
boost::shared_lock lock(mutex_);
writable = writableBackend_;
}

Expand All @@ -184,7 +208,7 @@ DatabaseRotatingImp::for_each(
std::function<void(std::shared_ptr<NodeObject>)> f)
{
auto [writable, archive] = [&] {
std::lock_guard lock(mutex_);
boost::shared_lock lock(mutex_);
return std::make_pair(writableBackend_, archiveBackend_);
}();

Expand Down
23 changes: 14 additions & 9 deletions src/xrpld/nodestore/detail/DatabaseRotatingImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

#include <xrpld/nodestore/DatabaseRotating.h>

#include <mutex>
#include <boost/thread/shared_mutex.hpp>

namespace ripple {
namespace NodeStore {
Expand All @@ -48,7 +48,7 @@ class DatabaseRotatingImp : public DatabaseRotating
stop();
}

void
[[nodiscard]] bool
rotateWithLock(
std::function<std::unique_ptr<NodeStore::Backend>(
std::string const& writableBackendName)> const& f) override;
Expand Down Expand Up @@ -79,16 +79,21 @@ class DatabaseRotatingImp : public DatabaseRotating
void
sweep() override;

// Include the space in the name to ensure it can't be set in a file
static constexpr auto unitTestFlag = " unit_test";

private:
std::shared_ptr<Backend> writableBackend_;
std::shared_ptr<Backend> archiveBackend_;
// This needs to be a recursive mutex because callbacks in `rotateWithLock`
// can call function that also lock the mutex. A current example of this is
// a callback from SHAMapStoreImp, which calls `clearCaches`. This
// `clearCaches` call eventually calls `fetchNodeObject` which tries to
// relock the mutex. It would be desirable to rewrite the code so the lock
// was not held during a callback.
mutable std::recursive_mutex mutex_;
bool const unitTest_;

// Implements the "UpgradeLockable" concept
// https://www.boost.org/doc/libs/release/doc/html/thread/synchronization.html#thread.synchronization.mutex_concepts.upgrade_lockable
// In short: Many threads can have shared ownership. One thread can have
// upgradable ownership at the same time as others have shared ownership.
// The upgradeable ownership can be upgraded to exclusive ownership,
// blocking if necessary until no other threads have shared ownership.
mutable boost::upgrade_mutex mutex_;

std::shared_ptr<NodeObject>
fetchNodeObject(
Expand Down
Loading