Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split core's database into "main" and "misc" to allow concurrent read/writes #4533

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
423 changes: 308 additions & 115 deletions src/database/Database.cpp

Large diffs are not rendered by default.

171 changes: 85 additions & 86 deletions src/database/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ namespace stellar
class Application;
class SQLLogContext;

using PreparedStatementCache =
std::map<std::string, std::shared_ptr<soci::statement>>;

// smallest schema version supported
static constexpr unsigned long MIN_MAIN_SCHEMA_VERSION = 21;
static constexpr unsigned long MAIN_SCHEMA_VERSION = 23;
static constexpr unsigned long FIRST_MAIN_VERSION_WITH_MISC = 23;

// Misc schema version 0 means no misc table exists yet
static constexpr unsigned long MIN_MISC_SCHEMA_VERSION = 0;
static constexpr unsigned long MISC_SCHEMA_VERSION = 1;

/**
* Helper class for borrowing a SOCI prepared statement handle into a local
* scope and cleaning it up once done with it. Returned by
Expand Down Expand Up @@ -60,6 +72,28 @@ class StatementContext : NonCopyable
}
};

class SessionWrapper : NonCopyable
{
soci::session mSession;
std::string const mSessionName;

public:
SessionWrapper(std::string sessionName)
: mSessionName(std::move(sessionName))
{
}
soci::session&
session()
{
return mSession;
}
std::string const&
getSessionName() const
{
return mSessionName;
}
};

/**
* Object that owns the database connection(s) that an application
* uses to store the current ledger and other persistent state in.
Expand All @@ -84,22 +118,45 @@ class StatementContext : NonCopyable
* (SQL isolation level 'SERIALIZABLE' in Postgresql and Sqlite, neither of
* which provide true serializability).
*/

struct PairHash
{
std::size_t
operator()(const std::pair<std::string, std::string>& key) const
{
return std::hash<std::string>{}(key.first) ^
(std::hash<std::string>{}(key.second) << 1);
}
};

class Database : NonMovableOrCopyable
{
Application& mApp;
medida::Meter& mQueryMeter;
soci::session mSession;
SessionWrapper mSession;
SessionWrapper mMiscSession;

std::unique_ptr<soci::connection_pool> mPool;
std::unique_ptr<soci::connection_pool> mMiscPool;

std::map<std::string, std::shared_ptr<soci::statement>> mStatements;
// Cache key -> session name <> query
using PrepStatementCacheKey = std::pair<std::string, std::string>;
std::unordered_map<PrepStatementCacheKey, std::shared_ptr<soci::statement>,
PairHash>
mStatements;
medida::Counter& mStatementsSize;

std::set<std::string> mEntityTypes;

static bool gDriversRegistered;
static void registerDrivers();
void applySchemaUpgrade(unsigned long vers);
void applyMiscSchemaUpgrade(unsigned long vers);
void open();
// Save `vers` as schema version of main DB.
void putMainSchemaVersion(unsigned long vers);
// Save `vers` as schema version of misc DB.
void putMiscSchemaVersion(unsigned long vers);
void populateMiscDatabase();
std::string getSQLiteDBLocation(soci::session& session);

public:
// Instantiate object and connect to app.getConfig().DATABASE;
Expand All @@ -118,8 +175,10 @@ class Database : NonMovableOrCopyable
// Return a helper object that borrows, from the Database, a prepared
// statement handle for the provided query. The prepared statement handle
// is created if necessary before borrowing, and reset (unbound from data)
// when the statement context is destroyed.
StatementContext getPreparedStatement(std::string const& query);
// when the statement context is destroyed. Prepared statements caches are
// per DB session.
StatementContext getPreparedStatement(std::string const& query,
SessionWrapper& session);

// Purge all cached prepared statements, closing their handles with the
// database.
Expand All @@ -142,6 +201,10 @@ class Database : NonMovableOrCopyable
// Return true if the Database target is SQLite, otherwise false.
bool isSqlite() const;

// Return true is the Database can use a miscellaneous database
// Currently, misc db is only supported by on-disk SQLite
bool canUseMiscDB() const;

// Return an optional SQL COLLATION clause to use for text-typed columns in
// this database, in order to ensure they're compared "simply" using
// byte-value comparisons, i.e. in a non-language-sensitive fashion. For
Expand All @@ -151,7 +214,8 @@ class Database : NonMovableOrCopyable

// Call `op` back with the specific database backend subtype in use.
template <typename T>
T doDatabaseTypeSpecificOperation(DatabaseTypeSpecificOperation<T>& op);
T doDatabaseTypeSpecificOperation(DatabaseTypeSpecificOperation<T>& op,
SessionWrapper& session);

// Return true if a connection pool is available for worker threads
// to read from the database through, otherwise false.
Expand All @@ -161,26 +225,26 @@ class Database : NonMovableOrCopyable
// by the new-db command on stellar-core.
void initialize();

// Save `vers` as schema version.
void putSchemaVersion(unsigned long vers);

// Get current schema version in DB.
unsigned long getDBSchemaVersion();

// Get current schema version of running application.
unsigned long getAppSchemaVersion();
// Get current schema version of main DB.
unsigned long getMainDBSchemaVersion();
// Get current schema version of misc DB.
unsigned long getMiscDBSchemaVersion();

// Check schema version and apply any upgrades if necessary.
void upgradeToCurrentSchema();

void dropTxMetaIfExists();

// Soci named session wrapper
SessionWrapper& getSession();
SessionWrapper& getMiscSession();
// Access the underlying SOCI session object
soci::session& getSession();
// Use these to directly access the soci session object
soci::session& getRawSession();
soci::session& getRawMiscSession();

// Access the optional SOCI connection pool available for worker
// threads. Throws an error if !canUsePool().
soci::connection_pool& getPool();
soci::connection_pool& getMiscPool();
};

template <typename T>
Expand Down Expand Up @@ -208,75 +272,10 @@ doDatabaseTypeSpecificOperation(soci::session& session,

template <typename T>
T
Database::doDatabaseTypeSpecificOperation(DatabaseTypeSpecificOperation<T>& op)
{
return stellar::doDatabaseTypeSpecificOperation(mSession, op);
}

// Select a set of records using a client-defined query string, then map
// each record into an element of a client-defined datatype by applying a
// client-defined function (the records are accumulated in the "out"
// vector).
template <typename T>
void
selectMap(Database& db, std::string const& selectStr,
std::function<T(soci::row const&)> makeT, std::vector<T>& out)
{
soci::rowset<soci::row> rs = (db.getSession().prepare << selectStr);

std::transform(rs.begin(), rs.end(), std::back_inserter(out), makeT);
}

// Map each element in the given vector of a client-defined datatype into a
// SQL update command by applying a client-defined function, then send those
// update strings to the database.
//
// The "postUpdate" function receives the number of records affected
// by the given update, as well as the element of the client-defined
// datatype which generated that update.
template <typename T>
void updateMap(Database& db, std::vector<T> const& in,
std::string const& updateStr,
std::function<void(soci::statement&, T const&)> prepUpdate,
std::function<void(long long const, T const&)> postUpdate);
template <typename T>
void
updateMap(Database& db, std::vector<T> const& in, std::string const& updateStr,
std::function<void(soci::statement&, T const&)> prepUpdate,
std::function<void(long long const, T const&)> postUpdate)
{
auto st_update = db.getPreparedStatement(updateStr).statement();

for (auto& recT : in)
{
prepUpdate(st_update, recT);
st_update.define_and_bind();
st_update.execute(true);
auto affected_rows = st_update.get_affected_rows();
st_update.clean_up(false);
postUpdate(affected_rows, recT);
}
}

// The composition of updateMap() following selectMap().
//
// Returns the number of records selected by selectMap() (all of which were
// then passed through updateMap() before the selectUpdateMap() call
// returned).
template <typename T>
size_t
selectUpdateMap(Database& db, std::string const& selectStr,
std::function<T(soci::row const&)> makeT,
std::string const& updateStr,
std::function<void(soci::statement&, T const&)> prepUpdate,
std::function<void(long long const, T const&)> postUpdate)
Database::doDatabaseTypeSpecificOperation(DatabaseTypeSpecificOperation<T>& op,
SessionWrapper& session)
{
std::vector<T> vecT;

selectMap<T>(db, selectStr, makeT, vecT);
updateMap<T>(db, vecT, updateStr, prepUpdate, postUpdate);

return vecT.size();
return stellar::doDatabaseTypeSpecificOperation(session.session(), op);
}

template <typename T>
Expand Down
15 changes: 7 additions & 8 deletions src/database/test/DatabaseTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ transactionTest(Application::pointer app)
int a0 = a + 1;
int a1 = a + 2;

auto& session = app->getDatabase().getSession();
auto& session = app->getDatabase().getRawSession();

session << "DROP TABLE IF EXISTS test";
session << "CREATE TABLE test (x INTEGER)";
Expand Down Expand Up @@ -104,7 +104,7 @@ checkMVCCIsolation(Application::pointer app)

int s2r1 = 0, s2r2 = 0, s2r3 = 0, s2r4 = 0;

auto& sess1 = app->getDatabase().getSession();
auto& sess1 = app->getDatabase().getRawSession();

sess1 << "DROP TABLE IF EXISTS test";
sess1 << "CREATE TABLE test (x INTEGER)";
Expand Down Expand Up @@ -217,7 +217,7 @@ TEST_CASE("postgres smoketest", "[db]")
Application::pointer app = createTestApplication(clock, cfg);
int a = 10, b = 0;

auto& session = app->getDatabase().getSession();
auto& session = app->getDatabase().getRawSession();

SECTION("round trip")
{
Expand Down Expand Up @@ -249,7 +249,7 @@ TEST_CASE("postgres smoketest", "[db]")

SECTION("postgres MVCC test")
{
app->getDatabase().getSession() << "drop table if exists test";
app->getDatabase().getRawSession() << "drop table if exists test";
checkMVCCIsolation(app);
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ TEST_CASE("postgres performance", "[db][pgperf][!hide]")
try
{
Application::pointer app = createTestApplication(clock, cfg);
auto& session = app->getDatabase().getSession();
auto& session = app->getDatabase().getRawSession();

session << "drop table if exists txtest;";
session << "create table txtest (a bigint, b bigint, c bigint, primary "
Expand Down Expand Up @@ -355,7 +355,6 @@ TEST_CASE("schema test", "[db]")
Application::pointer app = createTestApplication(clock, cfg);

auto& db = app->getDatabase();
auto dbv = db.getDBSchemaVersion();
auto av = db.getAppSchemaVersion();
REQUIRE(dbv == av);
auto dbv = db.getMainDBSchemaVersion();
REQUIRE(dbv == MAIN_SCHEMA_VERSION);
}
8 changes: 5 additions & 3 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2057,16 +2057,18 @@ void
HerderImpl::persistUpgrades()
{
ZoneScoped;
releaseAssert(threadIsMain());
auto s = mUpgrades.getParameters().toJson();
mApp.getPersistentState().setState(PersistentState::kLedgerUpgrades, s);
mApp.getPersistentState().setState(PersistentState::kLedgerUpgrades, s,
mApp.getDatabase().getMiscSession());
}

void
HerderImpl::restoreUpgrades()
{
ZoneScoped;
std::string s =
mApp.getPersistentState().getState(PersistentState::kLedgerUpgrades);
std::string s = mApp.getPersistentState().getState(
PersistentState::kLedgerUpgrades, mApp.getDatabase().getMiscSession());
if (!s.empty())
{
Upgrades::UpgradeParameters p;
Expand Down
1 change: 0 additions & 1 deletion src/herder/HerderPersistence.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,5 @@ class HerderPersistence
static void dropAll(Database& db);
static void deleteOldEntries(Database& db, uint32_t ledgerSeq,
uint32_t count);
static void createQuorumTrackingTable(soci::session& sess);
};
}
Loading
Loading