diff --git a/src/database/Database.cpp b/src/database/Database.cpp index 4ab34363cc..da5b363b2d 100644 --- a/src/database/Database.cpp +++ b/src/database/Database.cpp @@ -61,10 +61,6 @@ using namespace std; bool Database::gDriversRegistered = false; -// smallest schema version supported -static unsigned long const MIN_SCHEMA_VERSION = 21; -static unsigned long const SCHEMA_VERSION = 23; - // These should always match our compiled version precisely, since we are // using a bundled version to get access to carray(). But in case someone // overrides that or our build configuration changes, it's nicer to get a @@ -112,6 +108,13 @@ badSqliteVersion(int vers) return msg.str(); } +static std::string +getMiscDBName(std::string const& mainDB) +{ + std::string miscDB = mainDB.substr(0, mainDB.length() - 3) + "-misc.db"; + return miscDB; +} + void Database::registerDrivers() { @@ -185,6 +188,8 @@ Database::Database(Application& app) : mApp(app) , mQueryMeter( app.getMetrics().NewMeter({"database", "query", "exec"}, "query")) + , mSession("main") + , mMiscSession("misc") , mStatementsSize( app.getMetrics().NewCounter({"database", "memory", "statements"})) { @@ -199,9 +204,142 @@ Database::Database(Application& app) void Database::open() { - mSession.open(mApp.getConfig().DATABASE.value); - DatabaseConfigureSessionOp op(mSession); - doDatabaseTypeSpecificOperation(op); + mSession.session().open(mApp.getConfig().DATABASE.value); + DatabaseConfigureSessionOp op(mSession.session()); + doDatabaseTypeSpecificOperation(op, mSession); + + if (canUseMiscDB()) + { + std::string miscDB = getMiscDBName(mApp.getConfig().DATABASE.value); + mMiscSession.session().open(miscDB); + DatabaseConfigureSessionOp miscOp(mMiscSession.session()); + doDatabaseTypeSpecificOperation(miscOp, mMiscSession); + } +} + +static std::string +getDeleteStateQuery(Application& app, bool isMain) +{ + auto const& ps = app.getPersistentState(); + std::vector states; + for (auto let : xdr::xdr_traits::enum_values()) + { + states.push_back( + ps.getStoreStateName(PersistentState::kRebuildLedger, let)); + } + states.push_back(ps.getStoreStateName(PersistentState::kLastClosedLedger)); + states.push_back( + ps.getStoreStateName(PersistentState::kHistoryArchiveState)); + states.push_back(ps.getStoreStateName(PersistentState::kDatabaseSchema)); + states.push_back(ps.getStoreStateName(PersistentState::kNetworkPassphrase)); + states.push_back(ps.getStoreStateName(PersistentState::kDBBackend)); + + std::string query = fmt::format( + "DELETE FROM storestate WHERE statename {} IN (", isMain ? "NOT" : ""); + bool first = true; + for (auto const& stateName : states) + { + query = fmt::format("{}{}'{}'", query, first ? "" : ", ", stateName); + first = false; + } + query += ")"; + return query; +} + +std::string +Database::getSQLiteDBLocation(soci::session& session) +{ + releaseAssert(isSqlite()); + std::string loc; + int i; + std::string databaseName, databaseLocation; + soci::statement st = + (session.prepare << "PRAGMA database_list;", soci::into(i), + soci::into(databaseName), soci::into(databaseLocation)); + st.execute(true); + while (st.got_data()) + { + if (databaseName == "main") + { + loc = databaseLocation; + break; + } + } + return loc; +} + +void +Database::populateMiscDatabase() +{ + auto loc = getSQLiteDBLocation(getRawMiscSession()); + releaseAssert(!loc.empty()); + + // Step 1: Attach the source database + getRawMiscSession() << "ATTACH DATABASE '" + loc + "' AS source_db"; + + // Step 2: Retrieve table names from the source database + std::vector tableNames = { + "peers", "ban", "quoruminfo", "scpquorums", + "scphistory", "pubsub", "storestate"}; + + // Step 3: Copy data from each table + for (const auto& tableName : tableNames) + { + std::string insertQuery = "INSERT INTO " + tableName + + " SELECT * FROM source_db." + tableName; + + getRawMiscSession() << insertQuery; + CLOG_INFO(Database, "Data from {} copied successfully!", tableName); + } + + // Step 4: Remove data from storestate that doesn't belong there + getRawMiscSession() << getDeleteStateQuery(mApp, /* isMain */ false); + CLOG_INFO(Database, "Data from storestate removed successfully!"); + + // Step 5: Persist misc DB schema version + putMiscSchemaVersion(MISC_SCHEMA_VERSION); + CLOG_INFO(Database, "Misc: Schema version set to {}", MISC_SCHEMA_VERSION); +} + +void +Database::applyMiscSchemaUpgrade(unsigned long vers) +{ + clearPreparedStatementCache(); + soci::transaction tx(mMiscSession.session()); + switch (vers) + { + case 1: + // Create tables for the first time. + OverlayManager::dropAll(*this); + PersistentState::dropMisc(*this); + ExternalQueue::dropAll(*this); + HerderPersistence::dropAll(*this); + BanManager::dropAll(*this); + // Copy contents from the main DB. + populateMiscDatabase(); + break; + default: + throw std::runtime_error("Unknown DB schema version"); + } + tx.commit(); + + // Detach the source database _after_ commit to avoid "database is locked + // errors". If schema version is already the most recent, DETACH is a no-op. + getRawMiscSession() << "DETACH DATABASE source_db"; +} + +void +dropMiscTablesFromMain(Application& app) +{ + releaseAssert(app.getDatabase().canUseMiscDB()); + auto& db = app.getDatabase(); + db.getRawSession() << "DROP TABLE IF EXISTS peers;"; + db.getRawSession() << "DROP TABLE IF EXISTS ban;"; + db.getRawSession() << "DROP TABLE IF EXISTS quoruminfo;"; + db.getRawSession() << "DROP TABLE IF EXISTS scpquorums;"; + db.getRawSession() << "DROP TABLE IF EXISTS scphistory;"; + db.getRawSession() << "DROP TABLE IF EXISTS pubsub;"; + db.getRawSession() << getDeleteStateQuery(app, /* isMain */ true); } void @@ -209,7 +347,7 @@ Database::applySchemaUpgrade(unsigned long vers) { clearPreparedStatementCache(); - soci::transaction tx(mSession); + soci::transaction tx(mSession.session()); switch (vers) { case 22: @@ -218,6 +356,10 @@ Database::applySchemaUpgrade(unsigned long vers) case 23: mApp.getHistoryManager().dropSQLBasedPublish(); Upgrades::dropSupportUpgradeHistory(*this); + if (canUseMiscDB()) + { + dropMiscTablesFromMain(mApp); + } break; default: throw std::runtime_error("Unknown DB schema version"); @@ -226,116 +368,117 @@ Database::applySchemaUpgrade(unsigned long vers) } void -Database::upgradeToCurrentSchema() +validateVersion(unsigned long vers, unsigned long minVers, + unsigned long maxVers) { - auto vers = getDBSchemaVersion(); - if (vers < MIN_SCHEMA_VERSION) + if (vers < minVers) { std::string s = ("DB schema version " + std::to_string(vers) + " is older than minimum supported schema " + - std::to_string(MIN_SCHEMA_VERSION)); + std::to_string(minVers)); throw std::runtime_error(s); } - if (vers > SCHEMA_VERSION) + if (vers > maxVers) { - std::string s = ("DB schema version " + std::to_string(vers) + - " is newer than application schema " + - std::to_string(SCHEMA_VERSION)); + std::string s = + ("DB schema version " + std::to_string(vers) + + " is newer than application schema " + std::to_string(maxVers)); throw std::runtime_error(s); } - while (vers < SCHEMA_VERSION) - { - ++vers; - CLOG_INFO(Database, "Applying DB schema upgrade to version {}", vers); - applySchemaUpgrade(vers); - putSchemaVersion(vers); - } - - // While not really a schema upgrade, we need to upgrade the DB when - // BucketListDB is enabled. - if (mApp.getConfig().isUsingBucketListDB()) - { - // Tx meta column no longer supported in BucketListDB - dropTxMetaIfExists(); - } - - CLOG_INFO(Database, "DB schema is in current version"); - releaseAssert(vers == SCHEMA_VERSION); } void -Database::dropTxMetaIfExists() +Database::upgradeToCurrentSchema() { - int txMetaExists{}; - std::string selectStr; - if (isSqlite()) - { - selectStr = "SELECT EXISTS (" - "SELECT 1 " - "FROM pragma_table_info('txhistory') " - "WHERE name = 'txmeta');"; - } - else - { - selectStr = "SELECT EXISTS (" - "SELECT 1 " - "FROM information_schema.columns " - "WHERE " - "table_name = 'txhistory' AND " - "column_name = 'txmeta');"; - } - - auto& st = getPreparedStatement(selectStr).statement(); - st.exchange(soci::into(txMetaExists)); - st.define_and_bind(); - st.execute(true); + auto doMigration = [&](unsigned long vers, unsigned long minVers, + unsigned long maxVers, bool isMain) { + validateVersion(vers, minVers, maxVers); + while (vers < maxVers) + { + ++vers; + CLOG_INFO(Database, "{}: Applying DB schema upgrade to version {}", + isMain ? "Main" : "Misc", vers); + if (isMain) + { + applySchemaUpgrade(vers); + putMainSchemaVersion(vers); + } + else if (canUseMiscDB()) + { + applyMiscSchemaUpgrade(vers); + putMiscSchemaVersion(vers); + } + } + releaseAssert(vers == maxVers); + }; - if (txMetaExists) + // First perform migration of the MISC DB + auto mainVers = getMainDBSchemaVersion(); + auto miscVers = 0; + if (mainVers >= FIRST_MAIN_VERSION_WITH_MISC && canUseMiscDB()) { - CLOG_INFO(Database, "Dropping txmeta column from txhistory table"); - getSession() << "ALTER TABLE txhistory DROP COLUMN txmeta;"; + miscVers = getMiscDBSchemaVersion(); + doMigration(miscVers, MIN_MISC_SCHEMA_VERSION, MISC_SCHEMA_VERSION, + false); } + doMigration(mainVers, MIN_MAIN_SCHEMA_VERSION, MAIN_SCHEMA_VERSION, true); + CLOG_INFO(Database, "DB schema is in current version"); } void -Database::putSchemaVersion(unsigned long vers) +Database::putMainSchemaVersion(unsigned long vers) { mApp.getPersistentState().setState(PersistentState::kDatabaseSchema, - std::to_string(vers)); + std::to_string(vers), + mApp.getDatabase().getSession()); } -unsigned long -Database::getDBSchemaVersion() +void +Database::putMiscSchemaVersion(unsigned long vers) +{ + mApp.getPersistentState().setState(PersistentState::kMiscDatabaseSchema, + std::to_string(vers), + mApp.getDatabase().getMiscSession()); +} + +static unsigned long +getVersion(Application& app, PersistentState::Entry const& key, + SessionWrapper& session) { - unsigned long vers = 0; + std::optional vers; try { - auto vstr = mApp.getPersistentState().getState( - PersistentState::kDatabaseSchema); + auto vstr = app.getPersistentState().getState(key, session); vers = std::stoul(vstr); } catch (...) { } - if (vers == 0) + if (!vers) { throw std::runtime_error( "No DB schema version found, try stellar-core new-db"); } - return vers; + return *vers; +} + +unsigned long +Database::getMainDBSchemaVersion() +{ + return getVersion(mApp, PersistentState::kDatabaseSchema, getSession()); } unsigned long -Database::getAppSchemaVersion() +Database::getMiscDBSchemaVersion() { - return SCHEMA_VERSION; + return getVersion(mApp, PersistentState::kMiscDatabaseSchema, + getMiscSession()); } medida::TimerContext Database::getInsertTimer(std::string const& entityName) { - mEntityTypes.insert(entityName); mQueryMeter.Mark(); return mApp.getMetrics() .NewTimer({"database", "insert", entityName}) @@ -345,7 +488,6 @@ Database::getInsertTimer(std::string const& entityName) medida::TimerContext Database::getSelectTimer(std::string const& entityName) { - mEntityTypes.insert(entityName); mQueryMeter.Mark(); return mApp.getMetrics() .NewTimer({"database", "select", entityName}) @@ -355,7 +497,6 @@ Database::getSelectTimer(std::string const& entityName) medida::TimerContext Database::getDeleteTimer(std::string const& entityName) { - mEntityTypes.insert(entityName); mQueryMeter.Mark(); return mApp.getMetrics() .NewTimer({"database", "delete", entityName}) @@ -365,7 +506,6 @@ Database::getDeleteTimer(std::string const& entityName) medida::TimerContext Database::getUpdateTimer(std::string const& entityName) { - mEntityTypes.insert(entityName); mQueryMeter.Mark(); return mApp.getMetrics() .NewTimer({"database", "update", entityName}) @@ -375,7 +515,6 @@ Database::getUpdateTimer(std::string const& entityName) medida::TimerContext Database::getUpsertTimer(std::string const& entityName) { - mEntityTypes.insert(entityName); mQueryMeter.Mark(); return mApp.getMetrics() .NewTimer({"database", "upsert", entityName}) @@ -387,7 +526,8 @@ Database::setCurrentTransactionReadOnly() { if (!isSqlite()) { - auto prep = getPreparedStatement("SET TRANSACTION READ ONLY"); + auto prep = + getPreparedStatement("SET TRANSACTION READ ONLY", getSession()); auto& st = prep.statement(); st.define_and_bind(); st.execute(false); @@ -420,6 +560,12 @@ Database::canUsePool() const return !(mApp.getConfig().DATABASE.value == ("sqlite3://:memory:")); } +bool +Database::canUseMiscDB() const +{ + return canUsePool() && isSqlite(); +} + void Database::clearPreparedStatementCache() { @@ -439,29 +585,23 @@ Database::initialize() clearPreparedStatementCache(); if (isSqlite()) { - // delete the sqlite file directly if possible - std::string fn; - - { - int i; - std::string databaseName, databaseLocation; - soci::statement st = - (mSession.prepare << "PRAGMA database_list;", soci::into(i), - soci::into(databaseName), soci::into(databaseLocation)); - st.execute(true); - while (st.got_data()) + auto cleanup = [&](soci::session& sess) { + std::string fn = getSQLiteDBLocation(sess); + if (!fn.empty() && fs::exists(fn)) { - if (databaseName == "main") - { - fn = databaseLocation; - break; - } + sess.close(); + std::remove(fn.c_str()); + return true; } + return false; + }; + bool shouldOpen = cleanup(mSession.session()); + if (canUseMiscDB()) + { + releaseAssert(cleanup(mMiscSession.session()) == shouldOpen); } - if (!fn.empty() && fs::exists(fn)) + if (shouldOpen) { - mSession.close(); - std::remove(fn.c_str()); open(); } } @@ -470,7 +610,7 @@ Database::initialize() // only time this section should be modified is when // consolidating changes found in applySchemaUpgrade here - Upgrades::dropAll(*this); + Upgrades::dropSupportUpgradeHistory(*this); OverlayManager::dropAll(*this); PersistentState::dropAll(*this); ExternalQueue::dropAll(*this); @@ -481,15 +621,18 @@ Database::initialize() HistoryManager::dropAll(*this); HerderPersistence::dropAll(*this); BanManager::dropAll(*this); - putSchemaVersion(MIN_SCHEMA_VERSION); - mApp.getHerderPersistence().createQuorumTrackingTable(mSession); + putMainSchemaVersion(MIN_MAIN_SCHEMA_VERSION); + if (canUseMiscDB()) + { + putMiscSchemaVersion(MIN_MISC_SCHEMA_VERSION); + } LOG_INFO(DEFAULT_LOG, "* "); LOG_INFO(DEFAULT_LOG, "* The database has been initialized"); LOG_INFO(DEFAULT_LOG, "* "); } -soci::session& +SessionWrapper& Database::getSession() { // global session can only be used from the main thread @@ -497,33 +640,81 @@ Database::getSession() return mSession; } +SessionWrapper& +Database::getMiscSession() +{ + // global session can only be used from the main thread + releaseAssert(threadIsMain()); + // Use the main session if misc DB is not supported (e.g. Postgres) + if (!canUseMiscDB()) + { + return mSession; + } + return mMiscSession; +} + +soci::session& +Database::getRawSession() +{ + return getSession().session(); +} + +soci::session& +Database::getRawMiscSession() +{ + return getMiscSession().session(); +} + soci::connection_pool& -Database::getPool() +createPool(Database const& db, Config const& cfg, + std::unique_ptr& pool, std::string dbName) { - if (!mPool) + if (!pool) { - auto const& c = mApp.getConfig().DATABASE; - if (!canUsePool()) + auto const& c = cfg.DATABASE; + if (!db.canUsePool()) { std::string s("Can't create connection pool to "); s += removePasswordFromConnectionString(c.value); throw std::runtime_error(s); } size_t n = std::thread::hardware_concurrency(); + if (db.canUseMiscDB()) + { + n = std::max(n / 2, 1); + } LOG_INFO(DEFAULT_LOG, "Establishing {}-entry connection pool to: {}", n, removePasswordFromConnectionString(c.value)); - mPool = std::make_unique(n); + pool = std::make_unique(n); for (size_t i = 0; i < n; ++i) { LOG_DEBUG(DEFAULT_LOG, "Opening pool entry {}", i); - soci::session& sess = mPool->at(i); - sess.open(c.value); + soci::session& sess = pool->at(i); + sess.open(dbName); DatabaseConfigureSessionOp op(sess); stellar::doDatabaseTypeSpecificOperation(sess, op); } } - releaseAssert(mPool); - return *mPool; + releaseAssert(pool); + return *pool; +} + +soci::connection_pool& +Database::getPool() +{ + return createPool(*this, mApp.getConfig(), mPool, + mApp.getConfig().DATABASE.value); +} + +soci::connection_pool& +Database::getMiscPool() +{ + if (!canUseMiscDB()) + { + throw std::runtime_error("Can't use misc pool"); + } + return createPool(*this, mApp.getConfig(), mMiscPool, + getMiscDBName(mApp.getConfig().DATABASE.value)); } class SQLLogContext : NonCopyable @@ -563,16 +754,18 @@ class SQLLogContext : NonCopyable }; StatementContext -Database::getPreparedStatement(std::string const& query) +Database::getPreparedStatement(std::string const& query, + SessionWrapper& session) { - auto i = mStatements.find(query); + auto cacheKey = PrepStatementCacheKey(session.getSessionName(), query); + auto i = mStatements.find(cacheKey); std::shared_ptr p; if (i == mStatements.end()) { - p = std::make_shared(mSession); + p = std::make_shared(session.session()); p->alloc(); p->prepare(query); - mStatements.insert(std::make_pair(query, p)); + mStatements.insert(std::make_pair(cacheKey, p)); mStatementsSize.set_count(mStatements.size()); } else @@ -586,6 +779,6 @@ Database::getPreparedStatement(std::string const& query) std::shared_ptr Database::captureAndLogSQL(std::string contextName) { - return make_shared(contextName, mSession); + return make_shared(contextName, mSession.session()); } } diff --git a/src/database/Database.h b/src/database/Database.h index e3ad43b214..907af57778 100644 --- a/src/database/Database.h +++ b/src/database/Database.h @@ -27,6 +27,18 @@ namespace stellar class Application; class SQLLogContext; +using PreparedStatementCache = + std::map>; + +// smallest schema version supported +static constexpr unsigned long MIN_MAIN_SCHEMA_VERSION = 21; +static constexpr unsigned long MAIN_SCHEMA_VERSION = 23; +static constexpr unsigned long FIRST_MAIN_VERSION_WITH_MISC = 23; + +// Misc schema version 0 means no misc table exists yet +static constexpr unsigned long MIN_MISC_SCHEMA_VERSION = 0; +static constexpr unsigned long MISC_SCHEMA_VERSION = 1; + /** * Helper class for borrowing a SOCI prepared statement handle into a local * scope and cleaning it up once done with it. Returned by @@ -60,6 +72,28 @@ class StatementContext : NonCopyable } }; +class SessionWrapper : NonCopyable +{ + soci::session mSession; + std::string const mSessionName; + + public: + SessionWrapper(std::string sessionName) + : mSessionName(std::move(sessionName)) + { + } + soci::session& + session() + { + return mSession; + } + std::string const& + getSessionName() const + { + return mSessionName; + } +}; + /** * Object that owns the database connection(s) that an application * uses to store the current ledger and other persistent state in. @@ -84,22 +118,45 @@ class StatementContext : NonCopyable * (SQL isolation level 'SERIALIZABLE' in Postgresql and Sqlite, neither of * which provide true serializability). */ + +struct PairHash +{ + std::size_t + operator()(const std::pair& key) const + { + return std::hash{}(key.first) ^ + (std::hash{}(key.second) << 1); + } +}; + class Database : NonMovableOrCopyable { Application& mApp; medida::Meter& mQueryMeter; - soci::session mSession; + SessionWrapper mSession; + SessionWrapper mMiscSession; + std::unique_ptr mPool; + std::unique_ptr mMiscPool; - std::map> mStatements; + // Cache key -> session name <> query + using PrepStatementCacheKey = std::pair; + std::unordered_map, + PairHash> + mStatements; medida::Counter& mStatementsSize; - std::set mEntityTypes; - static bool gDriversRegistered; static void registerDrivers(); void applySchemaUpgrade(unsigned long vers); + void applyMiscSchemaUpgrade(unsigned long vers); void open(); + // Save `vers` as schema version of main DB. + void putMainSchemaVersion(unsigned long vers); + // Save `vers` as schema version of misc DB. + void putMiscSchemaVersion(unsigned long vers); + void populateMiscDatabase(); + std::string getSQLiteDBLocation(soci::session& session); public: // Instantiate object and connect to app.getConfig().DATABASE; @@ -118,8 +175,10 @@ class Database : NonMovableOrCopyable // Return a helper object that borrows, from the Database, a prepared // statement handle for the provided query. The prepared statement handle // is created if necessary before borrowing, and reset (unbound from data) - // when the statement context is destroyed. - StatementContext getPreparedStatement(std::string const& query); + // when the statement context is destroyed. Prepared statements caches are + // per DB session. + StatementContext getPreparedStatement(std::string const& query, + SessionWrapper& session); // Purge all cached prepared statements, closing their handles with the // database. @@ -142,6 +201,10 @@ class Database : NonMovableOrCopyable // Return true if the Database target is SQLite, otherwise false. bool isSqlite() const; + // Return true is the Database can use a miscellaneous database + // Currently, misc db is only supported by on-disk SQLite + bool canUseMiscDB() const; + // Return an optional SQL COLLATION clause to use for text-typed columns in // this database, in order to ensure they're compared "simply" using // byte-value comparisons, i.e. in a non-language-sensitive fashion. For @@ -151,7 +214,8 @@ class Database : NonMovableOrCopyable // Call `op` back with the specific database backend subtype in use. template - T doDatabaseTypeSpecificOperation(DatabaseTypeSpecificOperation& op); + T doDatabaseTypeSpecificOperation(DatabaseTypeSpecificOperation& op, + SessionWrapper& session); // Return true if a connection pool is available for worker threads // to read from the database through, otherwise false. @@ -161,26 +225,26 @@ class Database : NonMovableOrCopyable // by the new-db command on stellar-core. void initialize(); - // Save `vers` as schema version. - void putSchemaVersion(unsigned long vers); - - // Get current schema version in DB. - unsigned long getDBSchemaVersion(); - - // Get current schema version of running application. - unsigned long getAppSchemaVersion(); + // Get current schema version of main DB. + unsigned long getMainDBSchemaVersion(); + // Get current schema version of misc DB. + unsigned long getMiscDBSchemaVersion(); // Check schema version and apply any upgrades if necessary. void upgradeToCurrentSchema(); - void dropTxMetaIfExists(); - + // Soci named session wrapper + SessionWrapper& getSession(); + SessionWrapper& getMiscSession(); // Access the underlying SOCI session object - soci::session& getSession(); + // Use these to directly access the soci session object + soci::session& getRawSession(); + soci::session& getRawMiscSession(); // Access the optional SOCI connection pool available for worker // threads. Throws an error if !canUsePool(). soci::connection_pool& getPool(); + soci::connection_pool& getMiscPool(); }; template @@ -208,75 +272,10 @@ doDatabaseTypeSpecificOperation(soci::session& session, template T -Database::doDatabaseTypeSpecificOperation(DatabaseTypeSpecificOperation& op) -{ - return stellar::doDatabaseTypeSpecificOperation(mSession, op); -} - -// Select a set of records using a client-defined query string, then map -// each record into an element of a client-defined datatype by applying a -// client-defined function (the records are accumulated in the "out" -// vector). -template -void -selectMap(Database& db, std::string const& selectStr, - std::function makeT, std::vector& out) -{ - soci::rowset rs = (db.getSession().prepare << selectStr); - - std::transform(rs.begin(), rs.end(), std::back_inserter(out), makeT); -} - -// Map each element in the given vector of a client-defined datatype into a -// SQL update command by applying a client-defined function, then send those -// update strings to the database. -// -// The "postUpdate" function receives the number of records affected -// by the given update, as well as the element of the client-defined -// datatype which generated that update. -template -void updateMap(Database& db, std::vector const& in, - std::string const& updateStr, - std::function prepUpdate, - std::function postUpdate); -template -void -updateMap(Database& db, std::vector const& in, std::string const& updateStr, - std::function prepUpdate, - std::function postUpdate) -{ - auto st_update = db.getPreparedStatement(updateStr).statement(); - - for (auto& recT : in) - { - prepUpdate(st_update, recT); - st_update.define_and_bind(); - st_update.execute(true); - auto affected_rows = st_update.get_affected_rows(); - st_update.clean_up(false); - postUpdate(affected_rows, recT); - } -} - -// The composition of updateMap() following selectMap(). -// -// Returns the number of records selected by selectMap() (all of which were -// then passed through updateMap() before the selectUpdateMap() call -// returned). -template -size_t -selectUpdateMap(Database& db, std::string const& selectStr, - std::function makeT, - std::string const& updateStr, - std::function prepUpdate, - std::function postUpdate) +Database::doDatabaseTypeSpecificOperation(DatabaseTypeSpecificOperation& op, + SessionWrapper& session) { - std::vector vecT; - - selectMap(db, selectStr, makeT, vecT); - updateMap(db, vecT, updateStr, prepUpdate, postUpdate); - - return vecT.size(); + return stellar::doDatabaseTypeSpecificOperation(session.session(), op); } template diff --git a/src/database/test/DatabaseTests.cpp b/src/database/test/DatabaseTests.cpp index 4a17cd565c..2e483cf8cd 100644 --- a/src/database/test/DatabaseTests.cpp +++ b/src/database/test/DatabaseTests.cpp @@ -32,7 +32,7 @@ transactionTest(Application::pointer app) int a0 = a + 1; int a1 = a + 2; - auto& session = app->getDatabase().getSession(); + auto& session = app->getDatabase().getRawSession(); session << "DROP TABLE IF EXISTS test"; session << "CREATE TABLE test (x INTEGER)"; @@ -104,7 +104,7 @@ checkMVCCIsolation(Application::pointer app) int s2r1 = 0, s2r2 = 0, s2r3 = 0, s2r4 = 0; - auto& sess1 = app->getDatabase().getSession(); + auto& sess1 = app->getDatabase().getRawSession(); sess1 << "DROP TABLE IF EXISTS test"; sess1 << "CREATE TABLE test (x INTEGER)"; @@ -217,7 +217,7 @@ TEST_CASE("postgres smoketest", "[db]") Application::pointer app = createTestApplication(clock, cfg); int a = 10, b = 0; - auto& session = app->getDatabase().getSession(); + auto& session = app->getDatabase().getRawSession(); SECTION("round trip") { @@ -249,7 +249,7 @@ TEST_CASE("postgres smoketest", "[db]") SECTION("postgres MVCC test") { - app->getDatabase().getSession() << "drop table if exists test"; + app->getDatabase().getRawSession() << "drop table if exists test"; checkMVCCIsolation(app); } } @@ -279,7 +279,7 @@ TEST_CASE("postgres performance", "[db][pgperf][!hide]") try { Application::pointer app = createTestApplication(clock, cfg); - auto& session = app->getDatabase().getSession(); + auto& session = app->getDatabase().getRawSession(); session << "drop table if exists txtest;"; session << "create table txtest (a bigint, b bigint, c bigint, primary " @@ -355,7 +355,6 @@ TEST_CASE("schema test", "[db]") Application::pointer app = createTestApplication(clock, cfg); auto& db = app->getDatabase(); - auto dbv = db.getDBSchemaVersion(); - auto av = db.getAppSchemaVersion(); - REQUIRE(dbv == av); + auto dbv = db.getMainDBSchemaVersion(); + REQUIRE(dbv == MAIN_SCHEMA_VERSION); } diff --git a/src/herder/HerderImpl.cpp b/src/herder/HerderImpl.cpp index d2d3565c1a..e29f5fa2f8 100644 --- a/src/herder/HerderImpl.cpp +++ b/src/herder/HerderImpl.cpp @@ -2057,16 +2057,18 @@ void HerderImpl::persistUpgrades() { ZoneScoped; + releaseAssert(threadIsMain()); auto s = mUpgrades.getParameters().toJson(); - mApp.getPersistentState().setState(PersistentState::kLedgerUpgrades, s); + mApp.getPersistentState().setState(PersistentState::kLedgerUpgrades, s, + mApp.getDatabase().getMiscSession()); } void HerderImpl::restoreUpgrades() { ZoneScoped; - std::string s = - mApp.getPersistentState().getState(PersistentState::kLedgerUpgrades); + std::string s = mApp.getPersistentState().getState( + PersistentState::kLedgerUpgrades, mApp.getDatabase().getMiscSession()); if (!s.empty()) { Upgrades::UpgradeParameters p; diff --git a/src/herder/HerderPersistence.h b/src/herder/HerderPersistence.h index e5d44b0cc5..ea82cd747b 100644 --- a/src/herder/HerderPersistence.h +++ b/src/herder/HerderPersistence.h @@ -49,6 +49,5 @@ class HerderPersistence static void dropAll(Database& db); static void deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count); - static void createQuorumTrackingTable(soci::session& sess); }; } diff --git a/src/herder/HerderPersistenceImpl.cpp b/src/herder/HerderPersistenceImpl.cpp index bd743e8072..dbc1415a9b 100644 --- a/src/herder/HerderPersistenceImpl.cpp +++ b/src/herder/HerderPersistenceImpl.cpp @@ -47,12 +47,13 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, auto usedQSets = UnorderedMap{}; auto& db = mApp.getDatabase(); + auto& sess = db.getMiscSession(); - soci::transaction txscope(db.getSession()); + soci::transaction txscope(sess.session()); { auto prepClean = db.getPreparedStatement( - "DELETE FROM scphistory WHERE ledgerseq =:l"); + "DELETE FROM scphistory WHERE ledgerseq =:l", sess); auto& st = prepClean.statement(); st.exchange(soci::use(seq)); @@ -92,7 +93,8 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, auto prepEnv = db.getPreparedStatement("INSERT INTO scphistory " "(nodeid, ledgerseq, envelope) VALUES " - "(:n, :l, :e)"); + "(:n, :l, :e)", + sess); auto& st = prepEnv.statement(); st.exchange(soci::use(nodeIDs, "n")); st.exchange(soci::use(seqs, "l")); @@ -124,7 +126,7 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, std::string qSetHHex(binToHex(qSetH)); auto prep = db.getPreparedStatement( - "UPDATE quoruminfo SET qsethash = :h WHERE nodeid = :id"); + "UPDATE quoruminfo SET qsethash = :h WHERE nodeid = :id", sess); auto& st = prep.statement(); st.exchange(soci::use(qSetHHex)); st.exchange(soci::use(nodeIDStrKey)); @@ -136,7 +138,8 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, if (st.get_affected_rows() != 1) { auto prepI = db.getPreparedStatement( - "INSERT INTO quoruminfo (nodeid, qsethash) VALUES (:id, :h)"); + "INSERT INTO quoruminfo (nodeid, qsethash) VALUES (:id, :h)", + sess); auto& stI = prepI.statement(); stI.exchange(soci::use(nodeIDStrKey)); stI.exchange(soci::use(qSetHHex)); @@ -158,7 +161,7 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, uint32_t lastSeenSeq; auto prepSelQSet = db.getPreparedStatement( - "SELECT lastledgerseq FROM scpquorums WHERE qsethash = :h"); + "SELECT lastledgerseq FROM scpquorums WHERE qsethash = :h", sess); auto& stSel = prepSelQSet.statement(); stSel.exchange(soci::into(lastSeenSeq)); stSel.exchange(soci::use(qSetH)); @@ -177,7 +180,8 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, auto prepUpQSet = db.getPreparedStatement( "UPDATE scpquorums SET " - "lastledgerseq = :l WHERE qsethash = :h"); + "lastledgerseq = :l WHERE qsethash = :h", + sess); auto& stUp = prepUpQSet.statement(); stUp.exchange(soci::use(seq)); @@ -202,7 +206,8 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, auto prepInsQSet = db.getPreparedStatement( "INSERT INTO scpquorums " "(qsethash, lastledgerseq, qset) VALUES " - "(:h, :l, :v);"); + "(:h, :l, :v);", + sess); auto& stIns = prepInsQSet.statement(); stIns.exchange(soci::use(qSetH)); @@ -372,38 +377,35 @@ void HerderPersistence::dropAll(Database& db) { ZoneScoped; - db.getSession() << "DROP TABLE IF EXISTS scphistory"; + db.getRawMiscSession() << "DROP TABLE IF EXISTS scphistory"; - db.getSession() << "DROP TABLE IF EXISTS scpquorums"; + db.getRawMiscSession() << "DROP TABLE IF EXISTS scpquorums"; - db.getSession() << "CREATE TABLE scphistory (" - "nodeid CHARACTER(56) NOT NULL," - "ledgerseq INT NOT NULL CHECK (ledgerseq >= 0)," - "envelope TEXT NOT NULL" - ")"; + db.getRawMiscSession() << "CREATE TABLE scphistory (" + "nodeid CHARACTER(56) NOT NULL," + "ledgerseq INT NOT NULL CHECK (ledgerseq >= 0)," + "envelope TEXT NOT NULL" + ")"; - db.getSession() << "CREATE INDEX scpenvsbyseq ON scphistory(ledgerseq)"; + db.getRawMiscSession() + << "CREATE INDEX scpenvsbyseq ON scphistory(ledgerseq)"; - db.getSession() << "CREATE TABLE scpquorums (" - "qsethash CHARACTER(64) NOT NULL," - "lastledgerseq INT NOT NULL CHECK (lastledgerseq >= 0)," - "qset TEXT NOT NULL," - "PRIMARY KEY (qsethash)" - ")"; + db.getRawMiscSession() + << "CREATE TABLE scpquorums (" + "qsethash CHARACTER(64) NOT NULL," + "lastledgerseq INT NOT NULL CHECK (lastledgerseq >= 0)," + "qset TEXT NOT NULL," + "PRIMARY KEY (qsethash)" + ")"; - db.getSession() + db.getRawMiscSession() << "CREATE INDEX scpquorumsbyseq ON scpquorums(lastledgerseq)"; - db.getSession() << "DROP TABLE IF EXISTS quoruminfo"; -} - -void -HerderPersistence::createQuorumTrackingTable(soci::session& sess) -{ - sess << "CREATE TABLE quoruminfo (" - "nodeid CHARACTER(56) NOT NULL," - "qsethash CHARACTER(64) NOT NULL," - "PRIMARY KEY (nodeid))"; + db.getRawMiscSession() << "DROP TABLE IF EXISTS quoruminfo"; + db.getRawMiscSession() << "CREATE TABLE quoruminfo (" + "nodeid CHARACTER(56) NOT NULL," + "qsethash CHARACTER(64) NOT NULL," + "PRIMARY KEY (nodeid))"; } void @@ -411,9 +413,9 @@ HerderPersistence::deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count) { ZoneScoped; - DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count, - "scphistory", "ledgerseq"); - DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count, - "scpquorums", "lastledgerseq"); + DatabaseUtils::deleteOldEntriesHelper(db.getRawMiscSession(), ledgerSeq, + count, "scphistory", "ledgerseq"); + DatabaseUtils::deleteOldEntriesHelper(db.getRawMiscSession(), ledgerSeq, + count, "scpquorums", "lastledgerseq"); } } diff --git a/src/herder/PendingEnvelopes.cpp b/src/herder/PendingEnvelopes.cpp index 4db92c5619..3bd00a3a02 100644 --- a/src/herder/PendingEnvelopes.cpp +++ b/src/herder/PendingEnvelopes.cpp @@ -745,7 +745,8 @@ PendingEnvelopes::getQSet(Hash const& hash) else { auto& db = mApp.getDatabase(); - qset = HerderPersistence::getQuorumSet(db, db.getSession(), hash); + qset = + HerderPersistence::getQuorumSet(db, db.getRawMiscSession(), hash); } if (qset) { @@ -814,7 +815,7 @@ PendingEnvelopes::rebuildQuorumTrackerState() // see if we had some information for that node auto& db = mApp.getDatabase(); auto h = HerderPersistence::getNodeQuorumSet( - db, db.getSession(), id); + db, db.getRawMiscSession(), id); if (h) { res = getQSet(*h); diff --git a/src/herder/Upgrades.cpp b/src/herder/Upgrades.cpp index 689b7d0139..a24fbac2ed 100644 --- a/src/herder/Upgrades.cpp +++ b/src/herder/Upgrades.cpp @@ -690,22 +690,22 @@ Upgrades::timeForUpgrade(uint64_t time) const void Upgrades::dropAll(Database& db) { - db.getSession() << "DROP TABLE IF EXISTS upgradehistory"; - db.getSession() << "CREATE TABLE upgradehistory (" - "ledgerseq INT NOT NULL CHECK (ledgerseq >= 0), " - "upgradeindex INT NOT NULL, " - "upgrade TEXT NOT NULL, " - "changes TEXT NOT NULL, " - "PRIMARY KEY (ledgerseq, upgradeindex)" - ")"; - db.getSession() + db.getRawSession() << "DROP TABLE IF EXISTS upgradehistory"; + db.getRawSession() << "CREATE TABLE upgradehistory (" + "ledgerseq INT NOT NULL CHECK (ledgerseq >= 0), " + "upgradeindex INT NOT NULL, " + "upgrade TEXT NOT NULL, " + "changes TEXT NOT NULL, " + "PRIMARY KEY (ledgerseq, upgradeindex)" + ")"; + db.getRawSession() << "CREATE INDEX upgradehistbyseq ON upgradehistory (ledgerseq);"; } void Upgrades::dropSupportUpgradeHistory(Database& db) { - db.getSession() << "DROP TABLE IF EXISTS upgradehistory"; + db.getRawSession() << "DROP TABLE IF EXISTS upgradehistory"; } static void diff --git a/src/herder/test/HerderTests.cpp b/src/herder/test/HerderTests.cpp index 9366adb7c3..1fd964b319 100644 --- a/src/herder/test/HerderTests.cpp +++ b/src/herder/test/HerderTests.cpp @@ -226,7 +226,7 @@ TEST_CASE_VERSIONS("standalone", "[herder][acceptance]") app->getCommandHandler().manualCmd("setcursor?id=A1&cursor=1"); app->getCommandHandler().manualCmd("maintenance?queue=true"); auto& db = app->getDatabase(); - auto& sess = db.getSession(); + auto& sess = db.getRawSession(); app->getCommandHandler().manualCmd("setcursor?id=A2&cursor=3"); app->getCommandHandler().manualCmd("maintenance?queue=true"); @@ -3395,15 +3395,15 @@ TEST_CASE("overlay parallel processing") // soroban traffic currLoadGenCount = loadGenDone.count(); auto secondLoadGenCount = secondLoadGenDone.count(); - uint32_t const classicTxCount = 200; + uint32_t const txCount = 100; // Generate Soroban txs from one node loadGen.generateLoad(GeneratedLoadConfig::txLoad( LoadGenMode::SOROBAN_UPLOAD, 50, - /* nTxs */ 500, desiredTxRate, /* offset */ 0)); + /* nTxs */ txCount, desiredTxRate, /* offset */ 0)); // Generate classic txs from another node (with offset to prevent // overlapping accounts) secondLoadGen.generateLoad(GeneratedLoadConfig::txLoad( - LoadGenMode::PAY, 50, classicTxCount, desiredTxRate, + LoadGenMode::PAY, 50, txCount, desiredTxRate, /* offset */ 50)); simulation->crankUntil( @@ -5465,7 +5465,8 @@ TEST_CASE("SCP message capture from previous ledger", "[herder]") // Prepare query auto& db = node->getDatabase(); auto prep = db.getPreparedStatement( - "SELECT envelope FROM scphistory WHERE ledgerseq = :l"); + "SELECT envelope FROM scphistory WHERE ledgerseq = :l", + db.getMiscSession()); auto& st = prep.statement(); st.exchange(soci::use(ledgerNum)); std::string envStr; diff --git a/src/history/HistoryManagerImpl.cpp b/src/history/HistoryManagerImpl.cpp index f363cbb090..5389326fea 100644 --- a/src/history/HistoryManagerImpl.cpp +++ b/src/history/HistoryManagerImpl.cpp @@ -59,8 +59,8 @@ static std::string kSQLCreateStatement = void HistoryManager::dropAll(Database& db) { - db.getSession() << "DROP TABLE IF EXISTS publishqueue;"; - soci::statement st = db.getSession().prepare << kSQLCreateStatement; + db.getRawSession() << "DROP TABLE IF EXISTS publishqueue;"; + soci::statement st = db.getRawSession().prepare << kSQLCreateStatement; st.execute(true); } @@ -149,7 +149,8 @@ HistoryManagerImpl::dropSQLBasedPublish() // Migrate all the existing queued checkpoints to the new format { std::string state; - auto prep = db.getPreparedStatement("SELECT state FROM publishqueue;"); + auto prep = + db.getPreparedStatement("SELECT state FROM publishqueue;", sess); auto& st = prep.statement(); st.exchange(soci::into(state)); st.define_and_bind(); @@ -170,9 +171,9 @@ HistoryManagerImpl::dropSQLBasedPublish() for (auto const& checkpoint : checkpointLedgers) { auto begin = firstLedgerInCheckpointContaining(checkpoint); - populateCheckpointFilesFromDB(mApp, sess, begin, freq, + populateCheckpointFilesFromDB(mApp, sess.session(), begin, freq, mCheckpointBuilder); - LedgerHeaderUtils::copyToStream(db, sess, begin, freq, + LedgerHeaderUtils::copyToStream(db, sess.session(), begin, freq, mCheckpointBuilder); // Checkpoints in publish queue are complete, so we can finalize them mCheckpointBuilder.checkpointComplete(checkpoint); @@ -184,17 +185,17 @@ HistoryManagerImpl::dropSQLBasedPublish() { // Then, reconstruct any partial checkpoints that haven't yet been // queued - populateCheckpointFilesFromDB(mApp, sess, + populateCheckpointFilesFromDB(mApp, sess.session(), firstLedgerInCheckpointContaining(lcl), freq, mCheckpointBuilder); - LedgerHeaderUtils::copyToStream(db, sess, + LedgerHeaderUtils::copyToStream(db, sess.session(), firstLedgerInCheckpointContaining(lcl), freq, mCheckpointBuilder); } db.clearPreparedStatementCache(); // Now it's safe to drop obsolete SQL tables - sess << "DROP TABLE IF EXISTS publishqueue;"; + sess.session() << "DROP TABLE IF EXISTS publishqueue;"; dropSupportTxHistory(db); dropSupportTxSetHistory(db); } diff --git a/src/history/StateSnapshot.cpp b/src/history/StateSnapshot.cpp index 3995aeea1b..2dfd293d4e 100644 --- a/src/history/StateSnapshot.cpp +++ b/src/history/StateSnapshot.cpp @@ -55,10 +55,11 @@ StateSnapshot::writeSCPMessages() const { ZoneScoped; std::unique_ptr snapSess( - mApp.getDatabase().canUsePool() - ? std::make_unique(mApp.getDatabase().getPool()) - : nullptr); - soci::session& sess(snapSess ? *snapSess : mApp.getDatabase().getSession()); + (mApp.getDatabase().canUsePool() + ? std::make_unique(mApp.getDatabase().getMiscPool()) + : nullptr)); + soci::session& sess(snapSess ? *snapSess + : mApp.getDatabase().getRawMiscSession()); soci::transaction tx(sess); // The current "history block" is stored in _four_ files, one just ledger diff --git a/src/invariant/BucketListIsConsistentWithDatabase.cpp b/src/invariant/BucketListIsConsistentWithDatabase.cpp index e12da7b724..2e4e5d9b8c 100644 --- a/src/invariant/BucketListIsConsistentWithDatabase.cpp +++ b/src/invariant/BucketListIsConsistentWithDatabase.cpp @@ -8,6 +8,7 @@ #include "bucket/BucketList.h" #include "bucket/BucketManager.h" #include "crypto/Hex.h" +#include "database/Database.h" #include "history/HistoryArchive.h" #include "invariant/InvariantManager.h" #include "ledger/LedgerManager.h" @@ -251,7 +252,8 @@ BucketListIsConsistentWithDatabase::checkEntireBucketlist() } if (mApp.getConfig().isUsingBucketListDB() && - mApp.getPersistentState().getState(PersistentState::kDBBackend) != + mApp.getPersistentState().getState(PersistentState::kDBBackend, + mApp.getDatabase().getSession()) != BucketIndex::DB_BACKEND_STATE) { throw std::runtime_error("BucketListDB enabled but BucketListDB flag " diff --git a/src/ledger/InMemoryLedgerTxn.cpp b/src/ledger/InMemoryLedgerTxn.cpp index bcdaca07a2..2c13c12b8e 100644 --- a/src/ledger/InMemoryLedgerTxn.cpp +++ b/src/ledger/InMemoryLedgerTxn.cpp @@ -54,6 +54,12 @@ InMemoryLedgerTxn::FilteredEntryIteratorImpl::entryPtr() const return mIter.entryPtr(); } +SessionWrapper& +InMemoryLedgerTxn::getSession() const +{ + return mDb.getSession(); +} + bool InMemoryLedgerTxn::FilteredEntryIteratorImpl::entryExists() const { @@ -93,7 +99,7 @@ InMemoryLedgerTxn::addChild(AbstractLedgerTxn& child, TransactionMode mode) LedgerTxn::addChild(child, mode); if (mode == TransactionMode::READ_WRITE_WITH_SQL_TXN) { - mTransaction = std::make_unique(mDb.getSession()); + mTransaction = std::make_unique(mDb.getRawSession()); } } diff --git a/src/ledger/InMemoryLedgerTxn.h b/src/ledger/InMemoryLedgerTxn.h index 76cf56fcae..5bfc15ce75 100644 --- a/src/ledger/InMemoryLedgerTxn.h +++ b/src/ledger/InMemoryLedgerTxn.h @@ -100,6 +100,7 @@ class InMemoryLedgerTxn : public LedgerTxn UnorderedMap getPoolShareTrustLinesByAccountAndAsset(AccountID const& account, Asset const& asset) override; + SessionWrapper& getSession() const override; }; } diff --git a/src/ledger/InMemoryLedgerTxnRoot.cpp b/src/ledger/InMemoryLedgerTxnRoot.cpp index 386ceb2e93..9df8c05346 100644 --- a/src/ledger/InMemoryLedgerTxnRoot.cpp +++ b/src/ledger/InMemoryLedgerTxnRoot.cpp @@ -189,6 +189,12 @@ void InMemoryLedgerTxnRoot::prepareNewObjects(size_t) { } +SessionWrapper& +InMemoryLedgerTxnRoot::getSession() const +{ + throw std::runtime_error("ERROR!!"); +} + #ifdef BUILD_TESTS void InMemoryLedgerTxnRoot::resetForFuzzer() diff --git a/src/ledger/InMemoryLedgerTxnRoot.h b/src/ledger/InMemoryLedgerTxnRoot.h index 5d4bc3fe19..9b40fc2191 100644 --- a/src/ledger/InMemoryLedgerTxnRoot.h +++ b/src/ledger/InMemoryLedgerTxnRoot.h @@ -86,6 +86,7 @@ class InMemoryLedgerTxnRoot : public AbstractLedgerTxnParent LedgerKeyMeter* lkMeter) override; void prepareNewObjects(size_t s) override; + SessionWrapper& getSession() const override; #ifdef BUILD_TESTS void resetForFuzzer() override; diff --git a/src/ledger/LedgerHeaderUtils.cpp b/src/ledger/LedgerHeaderUtils.cpp index 0835439355..41eca7352e 100644 --- a/src/ledger/LedgerHeaderUtils.cpp +++ b/src/ledger/LedgerHeaderUtils.cpp @@ -43,7 +43,7 @@ isValid(LedgerHeader const& lh) } void -storeInDatabase(Database& db, LedgerHeader const& header) +storeInDatabase(Database& db, LedgerHeader const& header, SessionWrapper& sess) { ZoneScoped; if (!isValid(header)) @@ -64,7 +64,8 @@ storeInDatabase(Database& db, LedgerHeader const& header) "INSERT INTO ledgerheaders " "(ledgerhash, prevhash, bucketlisthash, ledgerseq, closetime, data) " "VALUES " - "(:h, :ph, :blh, :seq, :ct, :data)"); + "(:h, :ph, :blh, :seq, :ct, :data)", + sess); auto& st = prep.statement(); st.exchange(soci::use(hash)); st.exchange(soci::use(prevHash)); @@ -112,7 +113,8 @@ loadByHash(Database& db, Hash const& hash) std::string headerEncoded; auto prep = db.getPreparedStatement("SELECT data FROM ledgerheaders " - "WHERE ledgerhash = :h"); + "WHERE ledgerhash = :h", + db.getSession()); auto& st = prep.statement(); st.exchange(soci::into(headerEncoded)); st.exchange(soci::use(hash_s)); @@ -144,8 +146,8 @@ loadMaxLedgerSeq(Database& db) ZoneScoped; uint32_t seq = 0; soci::indicator maxIndicator; - auto prep = - db.getPreparedStatement("SELECT MAX(ledgerseq) FROM ledgerheaders"); + auto prep = db.getPreparedStatement( + "SELECT MAX(ledgerseq) FROM ledgerheaders", db.getSession()); auto& st = prep.statement(); st.exchange(soci::into(seq, maxIndicator)); st.define_and_bind(); @@ -191,7 +193,7 @@ void deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count) { ZoneScoped; - DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count, + DatabaseUtils::deleteOldEntriesHelper(db.getRawSession(), ledgerSeq, count, "ledgerheaders", "ledgerseq"); } @@ -232,17 +234,17 @@ dropAll(Database& db) { std::string coll = db.getSimpleCollationClause(); - db.getSession() << "DROP TABLE IF EXISTS ledgerheaders;"; - db.getSession() << "CREATE TABLE ledgerheaders (" - << "ledgerhash CHARACTER(64) " << coll - << " PRIMARY KEY," - << "prevhash CHARACTER(64) NOT NULL," - "bucketlisthash CHARACTER(64) NOT NULL," - "ledgerseq INT UNIQUE CHECK (ledgerseq >= 0)," - "closetime BIGINT NOT NULL CHECK (closetime >= 0)," - "data TEXT NOT NULL" - ");"; - db.getSession() + db.getRawSession() << "DROP TABLE IF EXISTS ledgerheaders;"; + db.getRawSession() + << "CREATE TABLE ledgerheaders (" + << "ledgerhash CHARACTER(64) " << coll << " PRIMARY KEY," + << "prevhash CHARACTER(64) NOT NULL," + "bucketlisthash CHARACTER(64) NOT NULL," + "ledgerseq INT UNIQUE CHECK (ledgerseq >= 0)," + "closetime BIGINT NOT NULL CHECK (closetime >= 0)," + "data TEXT NOT NULL" + ");"; + db.getRawSession() << "CREATE INDEX ledgersbyseq ON ledgerheaders ( ledgerseq );"; } } diff --git a/src/ledger/LedgerHeaderUtils.h b/src/ledger/LedgerHeaderUtils.h index e165570e1a..d67bed277d 100644 --- a/src/ledger/LedgerHeaderUtils.h +++ b/src/ledger/LedgerHeaderUtils.h @@ -18,7 +18,8 @@ uint32_t getFlags(LedgerHeader const& lh); bool isValid(LedgerHeader const& lh); -void storeInDatabase(Database& db, LedgerHeader const& header); +void storeInDatabase(Database& db, LedgerHeader const& header, + SessionWrapper& sess); LedgerHeader decodeFromData(std::string const& data); diff --git a/src/ledger/LedgerManagerImpl.cpp b/src/ledger/LedgerManagerImpl.cpp index 8e7c5841b2..9b67b95d42 100644 --- a/src/ledger/LedgerManagerImpl.cpp +++ b/src/ledger/LedgerManagerImpl.cpp @@ -290,8 +290,8 @@ LedgerManagerImpl::loadLastKnownLedger(bool restoreBucketlist, ZoneScoped; // Step 1. Load LCL state from the DB and extract latest ledger hash - string lastLedger = - mApp.getPersistentState().getState(PersistentState::kLastClosedLedger); + string lastLedger = mApp.getPersistentState().getState( + PersistentState::kLastClosedLedger, mApp.getDatabase().getSession()); if (lastLedger.empty()) { @@ -434,12 +434,16 @@ LedgerManagerImpl::setupInMemoryStateRebuild() LedgerHeader lh; HistoryArchiveState has; auto& ps = mApp.getPersistentState(); - ps.setState(PersistentState::kLastClosedLedger, - binToHex(xdrSha256(lh))); - ps.setState(PersistentState::kHistoryArchiveState, has.toString()); - ps.setState(PersistentState::kLastSCPData, ""); - ps.setState(PersistentState::kLastSCPDataXDR, ""); - ps.setState(PersistentState::kLedgerUpgrades, ""); + ps.setState(PersistentState::kLastClosedLedger, binToHex(xdrSha256(lh)), + getDatabase().getSession()); + ps.setState(PersistentState::kHistoryArchiveState, has.toString(), + getDatabase().getSession()); + ps.setState(PersistentState::kLastSCPData, "", + getDatabase().getMiscSession()); + ps.setState(PersistentState::kLastSCPDataXDR, "", + getDatabase().getMiscSession()); + ps.setState(PersistentState::kLedgerUpgrades, "", + getDatabase().getMiscSession()); mRebuildInMemoryState = true; } } @@ -541,7 +545,7 @@ LedgerManagerImpl::getLastClosedLedgerHAS() ZoneScoped; string hasString = mApp.getPersistentState().getState( - PersistentState::kHistoryArchiveState); + PersistentState::kHistoryArchiveState, mApp.getDatabase().getSession()); HistoryArchiveState has; has.fromString(hasString); return has; @@ -1106,7 +1110,7 @@ LedgerManagerImpl::deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count) { ZoneScoped; - soci::transaction txscope(db.getSession()); + soci::transaction txscope(db.getRawSession()); db.clearPreparedStatementCache(); LedgerHeaderUtils::deleteOldEntries(db, ledgerSeq, count); HerderPersistence::deleteOldEntries(db, ledgerSeq, count); @@ -1621,8 +1625,9 @@ LedgerManagerImpl::storeCurrentLedger(LedgerHeader const& header, Hash hash = xdrSha256(header); releaseAssert(!isZero(hash)); + auto& sess = mApp.getLedgerTxnRoot().getSession(); mApp.getPersistentState().setState(PersistentState::kLastClosedLedger, - binToHex(hash)); + binToHex(hash), sess); BucketList bl; if (mApp.getConfig().MODE_ENABLES_BUCKETLIST) @@ -1635,11 +1640,11 @@ LedgerManagerImpl::storeCurrentLedger(LedgerHeader const& header, mApp.getConfig().NETWORK_PASSPHRASE); mApp.getPersistentState().setState(PersistentState::kHistoryArchiveState, - has.toString()); + has.toString(), sess); if (mApp.getConfig().MODE_STORES_HISTORY_LEDGERHEADERS && storeHeader) { - LedgerHeaderUtils::storeInDatabase(mApp.getDatabase(), header); + LedgerHeaderUtils::storeInDatabase(mApp.getDatabase(), header, sess); if (appendToCheckpoint) { mApp.getHistoryManager().appendLedgerHeader(header); diff --git a/src/ledger/LedgerTxn.cpp b/src/ledger/LedgerTxn.cpp index 2085d7c92c..aeb0a92d2c 100644 --- a/src/ledger/LedgerTxn.cpp +++ b/src/ledger/LedgerTxn.cpp @@ -2392,6 +2392,12 @@ LedgerTxn::Impl::hasSponsorshipEntry() const return false; } +SessionWrapper& +LedgerTxn::getSession() const +{ + throw std::runtime_error("Errror"); +} + void LedgerTxn::prepareNewObjects(size_t s) { @@ -2545,6 +2551,21 @@ LedgerTxnRoot::Impl::~Impl() } } +SessionWrapper& +LedgerTxnRoot::Impl::getSession() const +{ + // For now, return main app-wide session; + // When application is done in parallel, mSession will be set to a session + // established from the connection pool. + return mApp.getDatabase().getSession(); +} + +SessionWrapper& +LedgerTxnRoot::getSession() const +{ + return mImpl->getSession(); +} + #ifdef BUILD_TESTS void LedgerTxnRoot::Impl::resetForFuzzer() @@ -2576,8 +2597,8 @@ LedgerTxnRoot::Impl::addChild(AbstractLedgerTxn& child, TransactionMode mode) if (mode == TransactionMode::READ_WRITE_WITH_SQL_TXN) { - mTransaction = std::make_unique( - mApp.getDatabase().getSession()); + mTransaction = + std::make_unique(getSession().session()); } else { @@ -2867,6 +2888,7 @@ LedgerTxnRoot::Impl::commitChild(EntryIterator iter, // std::unique_ptr<...>::reset does not throw mTransaction.reset(); + mSession.reset(); // std::unique_ptr<...>::swap does not throw mHeader.swap(childHeader); @@ -2921,7 +2943,7 @@ LedgerTxnRoot::Impl::countObjects(LedgerEntryType let) const std::string query = "SELECT COUNT(*) FROM " + tableFromLedgerEntryType(let) + ";"; uint64_t count = 0; - mApp.getDatabase().getSession() << query, into(count); + getSession().session() << query, into(count); return count; } @@ -2945,8 +2967,7 @@ LedgerTxnRoot::Impl::countObjects(LedgerEntryType let, uint64_t count = 0; int first = static_cast(ledgers.mFirst); int limit = static_cast(ledgers.limit()); - mApp.getDatabase().getSession() << query, into(count), use(first), - use(limit); + getSession().session() << query, into(count), use(first), use(limit); return count; } @@ -2969,7 +2990,7 @@ LedgerTxnRoot::Impl::deleteObjectsModifiedOnOrAfterLedger(uint32_t ledger) const LedgerEntryType t = static_cast(let); std::string query = "DELETE FROM " + tableFromLedgerEntryType(t) + " WHERE lastmodified >= :v1"; - mApp.getDatabase().getSession() << query, use(ledger); + getSession().session() << query, use(ledger); } } @@ -3837,6 +3858,7 @@ LedgerTxnRoot::Impl::rollbackChild() noexcept { mTransaction->rollback(); mTransaction.reset(); + mSession.reset(); } catch (std::exception& e) { diff --git a/src/ledger/LedgerTxn.h b/src/ledger/LedgerTxn.h index b839ddafc5..cb6b5e8f79 100644 --- a/src/ledger/LedgerTxn.h +++ b/src/ledger/LedgerTxn.h @@ -16,6 +16,7 @@ #include #include #include +#include ///////////////////////////////////////////////////////////////////////////// // Overview @@ -276,6 +277,7 @@ struct InflationVotes; struct LedgerEntry; struct LedgerKey; struct LedgerRange; +class SessionWrapper; struct OfferDescriptor { @@ -538,6 +540,8 @@ class AbstractLedgerTxnParent // prepares to increase the capacity of pending changes by up to "s" changes virtual void prepareNewObjects(size_t s) = 0; + virtual SessionWrapper& getSession() const = 0; + #ifdef BUILD_TESTS virtual void resetForFuzzer() = 0; #endif // BUILD_TESTS @@ -838,6 +842,7 @@ class LedgerTxn : public AbstractLedgerTxn uint32_t prefetchSoroban(UnorderedSet const& keys, LedgerKeyMeter* lkMeter) override; void prepareNewObjects(size_t s) override; + SessionWrapper& getSession() const override; bool hasSponsorshipEntry() const override; @@ -943,5 +948,6 @@ class LedgerTxnRoot : public AbstractLedgerTxnParent OfferDescriptor const* worseThan, std::unordered_set& exclude) override; #endif + SessionWrapper& getSession() const override; }; } diff --git a/src/ledger/LedgerTxnAccountSQL.cpp b/src/ledger/LedgerTxnAccountSQL.cpp index db51158f65..215277e042 100644 --- a/src/ledger/LedgerTxnAccountSQL.cpp +++ b/src/ledger/LedgerTxnAccountSQL.cpp @@ -43,7 +43,8 @@ LedgerTxnRoot::Impl::loadAccount(LedgerKey const& key) const "inflationdest, homedomain, thresholds, " "flags, lastmodified, " "signers, extension, " - "ledgerext FROM accounts WHERE accountid=:v1"); + "ledgerext FROM accounts WHERE accountid=:v1", + getSession()); auto& st = prep.statement(); st.exchange(soci::into(account.balance)); st.exchange(soci::into(account.seqNum)); @@ -109,7 +110,8 @@ LedgerTxnRoot::Impl::loadInflationWinners(size_t maxWinners, "SELECT sum(balance) AS votes, inflationdest" " FROM accounts WHERE inflationdest IS NOT NULL" " AND balance >= 1000000000 GROUP BY inflationdest" - " ORDER BY votes DESC, inflationdest DESC LIMIT :lim"); + " ORDER BY votes DESC, inflationdest DESC LIMIT :lim", + getSession()); auto& st = prep.statement(); st.exchange(soci::into(w.votes)); st.exchange(soci::into(inflationDest)); @@ -134,6 +136,7 @@ LedgerTxnRoot::Impl::loadInflationWinners(size_t maxWinners, class BulkUpsertAccountsOperation : public DatabaseTypeSpecificOperation { Database& mDB; + SessionWrapper& mSession; std::vector mAccountIDs; std::vector mBalances; std::vector mSeqNums; @@ -152,8 +155,9 @@ class BulkUpsertAccountsOperation : public DatabaseTypeSpecificOperation public: BulkUpsertAccountsOperation(Database& DB, - std::vector const& entries) - : mDB(DB) + std::vector const& entries, + SessionWrapper& session) + : mDB(DB), mSession(session) { mAccountIDs.reserve(entries.size()); mBalances.reserve(entries.size()); @@ -251,7 +255,7 @@ class BulkUpsertAccountsOperation : public DatabaseTypeSpecificOperation "lastmodified = excluded.lastmodified, " "extension = excluded.extension, " "ledgerext = excluded.ledgerext"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mAccountIDs)); st.exchange(soci::use(mBalances)); @@ -338,7 +342,7 @@ class BulkUpsertAccountsOperation : public DatabaseTypeSpecificOperation "lastmodified = excluded.lastmodified, " "extension = excluded.extension, " "ledgerext = excluded.ledgerext"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strAccountIDs)); st.exchange(soci::use(strBalances)); @@ -370,11 +374,13 @@ class BulkDeleteAccountsOperation : public DatabaseTypeSpecificOperation Database& mDB; LedgerTxnConsistency mCons; std::vector mAccountIDs; + SessionWrapper& mSession; public: BulkDeleteAccountsOperation(Database& DB, LedgerTxnConsistency cons, - std::vector const& entries) - : mDB(DB), mCons(cons) + std::vector const& entries, + SessionWrapper& session) + : mDB(DB), mCons(cons), mSession(session) { for (auto const& e : entries) { @@ -391,7 +397,7 @@ class BulkDeleteAccountsOperation : public DatabaseTypeSpecificOperation doSociGenericOperation() { std::string sql = "DELETE FROM accounts WHERE accountid = :id"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mAccountIDs)); st.define_and_bind(); @@ -422,7 +428,7 @@ class BulkDeleteAccountsOperation : public DatabaseTypeSpecificOperation std::string sql = "WITH r AS (SELECT unnest(:ids::TEXT[])) " "DELETE FROM accounts WHERE accountid IN (SELECT * FROM r)"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strAccountIDs)); st.define_and_bind(); @@ -445,8 +451,8 @@ LedgerTxnRoot::Impl::bulkUpsertAccounts( { ZoneScoped; ZoneValue(static_cast(entries.size())); - BulkUpsertAccountsOperation op(mApp.getDatabase(), entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkUpsertAccountsOperation op(mApp.getDatabase(), entries, getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -455,8 +461,9 @@ LedgerTxnRoot::Impl::bulkDeleteAccounts( { ZoneScoped; ZoneValue(static_cast(entries.size())); - BulkDeleteAccountsOperation op(mApp.getDatabase(), cons, entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkDeleteAccountsOperation op(mApp.getDatabase(), cons, entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -466,14 +473,14 @@ LedgerTxnRoot::Impl::dropAccounts(bool rebuild) mEntryCache.clear(); mBestOffers.clear(); - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS accounts;"; - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS signers;"; + mApp.getDatabase().getRawSession() << "DROP TABLE IF EXISTS accounts;"; + mApp.getDatabase().getRawSession() << "DROP TABLE IF EXISTS signers;"; if (rebuild) { std::string coll = mApp.getDatabase().getSimpleCollationClause(); - mApp.getDatabase().getSession() + mApp.getDatabase().getRawSession() << "CREATE TABLE accounts" << "(" << "accountid VARCHAR(56) " << coll << " PRIMARY KEY," @@ -494,9 +501,10 @@ LedgerTxnRoot::Impl::dropAccounts(bool rebuild) ");"; if (!mApp.getDatabase().isSqlite()) { - mApp.getDatabase().getSession() << "ALTER TABLE accounts " - << "ALTER COLUMN accountid " - << "TYPE VARCHAR(56) COLLATE \"C\""; + mApp.getDatabase().getRawSession() + << "ALTER TABLE accounts " + << "ALTER COLUMN accountid " + << "TYPE VARCHAR(56) COLLATE \"C\""; } } } @@ -506,6 +514,7 @@ class BulkLoadAccountsOperation { Database& mDb; std::vector mAccountIDs; + SessionWrapper& mSession; std::vector executeAndFetch(soci::statement& st) @@ -592,8 +601,9 @@ class BulkLoadAccountsOperation } public: - BulkLoadAccountsOperation(Database& db, UnorderedSet const& keys) - : mDb(db) + BulkLoadAccountsOperation(Database& db, UnorderedSet const& keys, + SessionWrapper& session) + : mDb(db), mSession(session) { mAccountIDs.reserve(keys.size()); for (auto const& k : keys) @@ -620,7 +630,7 @@ class BulkLoadAccountsOperation " FROM accounts " "WHERE accountid IN carray(?, ?, 'char*')"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto be = prep.statement().get_backend(); if (be == nullptr) { @@ -651,7 +661,7 @@ class BulkLoadAccountsOperation " FROM accounts " "WHERE accountid IN (SELECT * FROM r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strAccountIDs)); return executeAndFetch(st); @@ -666,9 +676,10 @@ LedgerTxnRoot::Impl::bulkLoadAccounts(UnorderedSet const& keys) const ZoneValue(static_cast(keys.size())); if (!keys.empty()) { - BulkLoadAccountsOperation op(mApp.getDatabase(), keys); + BulkLoadAccountsOperation op(mApp.getDatabase(), keys, getSession()); return populateLoadedEntries( - keys, mApp.getDatabase().doDatabaseTypeSpecificOperation(op)); + keys, mApp.getDatabase().doDatabaseTypeSpecificOperation( + op, getSession())); } else { diff --git a/src/ledger/LedgerTxnClaimableBalanceSQL.cpp b/src/ledger/LedgerTxnClaimableBalanceSQL.cpp index e952589209..c15708a38c 100644 --- a/src/ledger/LedgerTxnClaimableBalanceSQL.cpp +++ b/src/ledger/LedgerTxnClaimableBalanceSQL.cpp @@ -22,7 +22,7 @@ LedgerTxnRoot::Impl::loadClaimableBalance(LedgerKey const& key) const std::string sql = "SELECT ledgerentry " "FROM claimablebalance " "WHERE balanceid= :balanceid"; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::into(claimableBalanceEntryStr)); st.exchange(soci::use(balanceID)); @@ -44,6 +44,7 @@ class BulkLoadClaimableBalanceOperation { Database& mDb; std::vector mBalanceIDs; + SessionWrapper& mSession; std::vector executeAndFetch(soci::statement& st) @@ -74,8 +75,9 @@ class BulkLoadClaimableBalanceOperation public: BulkLoadClaimableBalanceOperation(Database& db, - UnorderedSet const& keys) - : mDb(db) + UnorderedSet const& keys, + SessionWrapper& session) + : mDb(db), mSession(session) { mBalanceIDs.reserve(keys.size()); for (auto const& k : keys) @@ -101,7 +103,7 @@ class BulkLoadClaimableBalanceOperation "FROM claimablebalance " "WHERE balanceid IN r"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto be = prep.statement().get_backend(); if (be == nullptr) { @@ -129,7 +131,7 @@ class BulkLoadClaimableBalanceOperation "FROM claimablebalance " "WHERE balanceid IN (SELECT * from r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strBalanceIDs)); return executeAndFetch(st); @@ -143,9 +145,11 @@ LedgerTxnRoot::Impl::bulkLoadClaimableBalance( { if (!keys.empty()) { - BulkLoadClaimableBalanceOperation op(mApp.getDatabase(), keys); + BulkLoadClaimableBalanceOperation op(mApp.getDatabase(), keys, + getSession()); return populateLoadedEntries( - keys, mApp.getDatabase().doDatabaseTypeSpecificOperation(op)); + keys, mApp.getDatabase().doDatabaseTypeSpecificOperation( + op, getSession())); } else { @@ -159,12 +163,13 @@ class BulkDeleteClaimableBalanceOperation Database& mDb; LedgerTxnConsistency mCons; std::vector mBalanceIDs; + SessionWrapper& mSession; public: BulkDeleteClaimableBalanceOperation( Database& db, LedgerTxnConsistency cons, - std::vector const& entries) - : mDb(db), mCons(cons) + std::vector const& entries, SessionWrapper& session) + : mDb(db), mCons(cons), mSession(session) { mBalanceIDs.reserve(entries.size()); for (auto const& e : entries) @@ -180,7 +185,7 @@ class BulkDeleteClaimableBalanceOperation doSociGenericOperation() { std::string sql = "DELETE FROM claimablebalance WHERE balanceid = :id"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(mBalanceIDs)); st.define_and_bind(); @@ -212,7 +217,7 @@ class BulkDeleteClaimableBalanceOperation "DELETE FROM claimablebalance " "WHERE balanceid IN (SELECT * FROM r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strBalanceIDs)); st.define_and_bind(); @@ -233,8 +238,9 @@ void LedgerTxnRoot::Impl::bulkDeleteClaimableBalance( std::vector const& entries, LedgerTxnConsistency cons) { - BulkDeleteClaimableBalanceOperation op(mApp.getDatabase(), cons, entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkDeleteClaimableBalanceOperation op(mApp.getDatabase(), cons, entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } class BulkUpsertClaimableBalanceOperation @@ -244,6 +250,7 @@ class BulkUpsertClaimableBalanceOperation std::vector mBalanceIDs; std::vector mClaimableBalanceEntrys; std::vector mLastModifieds; + SessionWrapper& mSession; void accumulateEntry(LedgerEntry const& entry) @@ -258,8 +265,9 @@ class BulkUpsertClaimableBalanceOperation public: BulkUpsertClaimableBalanceOperation( - Database& Db, std::vector const& entryIter) - : mDb(Db) + Database& Db, std::vector const& entryIter, + SessionWrapper& session) + : mDb(Db), mSession(session) { for (auto const& e : entryIter) { @@ -280,7 +288,7 @@ class BulkUpsertClaimableBalanceOperation "excluded.ledgerentry, lastmodified = " "excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mBalanceIDs)); st.exchange(soci::use(mClaimableBalanceEntrys)); @@ -325,7 +333,7 @@ class BulkUpsertClaimableBalanceOperation "excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strBalanceIDs)); st.exchange(soci::use(strClaimableBalanceEntry)); @@ -347,8 +355,9 @@ void LedgerTxnRoot::Impl::bulkUpsertClaimableBalance( std::vector const& entries) { - BulkUpsertClaimableBalanceOperation op(mApp.getDatabase(), entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkUpsertClaimableBalanceOperation op(mApp.getDatabase(), entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -358,12 +367,13 @@ LedgerTxnRoot::Impl::dropClaimableBalances(bool rebuild) mEntryCache.clear(); mBestOffers.clear(); - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS claimablebalance;"; + mApp.getDatabase().getRawSession() + << "DROP TABLE IF EXISTS claimablebalance;"; if (rebuild) { std::string coll = mApp.getDatabase().getSimpleCollationClause(); - mApp.getDatabase().getSession() + mApp.getDatabase().getRawSession() << "CREATE TABLE claimablebalance (" << "balanceid VARCHAR(48) " << coll << " PRIMARY KEY, " << "ledgerentry TEXT NOT NULL, " diff --git a/src/ledger/LedgerTxnConfigSettingSQL.cpp b/src/ledger/LedgerTxnConfigSettingSQL.cpp index d06282e203..0db0098714 100644 --- a/src/ledger/LedgerTxnConfigSettingSQL.cpp +++ b/src/ledger/LedgerTxnConfigSettingSQL.cpp @@ -30,7 +30,7 @@ LedgerTxnRoot::Impl::loadConfigSetting(LedgerKey const& key) const std::string sql = "SELECT ledgerentry " "FROM configsettings " "WHERE configsettingid = :configsettingid"; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::into(configSettingEntryStr)); st.exchange(soci::use(configSettingID)); @@ -55,6 +55,7 @@ class bulkLoadConfigSettingsOperation : public DatabaseTypeSpecificOperation> { Database& mDb; + SessionWrapper& mSession; std::vector mConfigSettingIDs; std::vector @@ -85,8 +86,9 @@ class bulkLoadConfigSettingsOperation public: bulkLoadConfigSettingsOperation(Database& db, - UnorderedSet const& keys) - : mDb(db) + UnorderedSet const& keys, + SessionWrapper& session) + : mDb(db), mSession(session) { mConfigSettingIDs.reserve(keys.size()); for (auto const& k : keys) @@ -104,7 +106,7 @@ class bulkLoadConfigSettingsOperation "FROM configsettings " "WHERE configsettingid IN r"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto be = prep.statement().get_backend(); if (be == nullptr) { @@ -133,7 +135,7 @@ class bulkLoadConfigSettingsOperation "FROM configsettings " "WHERE configsettingid IN (SELECT * from r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strConfigSettingIDs)); return executeAndFetch(st); @@ -147,9 +149,11 @@ LedgerTxnRoot::Impl::bulkLoadConfigSettings( { if (!keys.empty()) { - bulkLoadConfigSettingsOperation op(mApp.getDatabase(), keys); + bulkLoadConfigSettingsOperation op(mApp.getDatabase(), keys, + getSession()); return populateLoadedEntries( - keys, mApp.getDatabase().doDatabaseTypeSpecificOperation(op)); + keys, mApp.getDatabase().doDatabaseTypeSpecificOperation( + op, getSession())); } else { @@ -161,6 +165,7 @@ class bulkUpsertConfigSettingsOperation : public DatabaseTypeSpecificOperation { Database& mDb; + SessionWrapper& mSession; std::vector mConfigSettingIDs; std::vector mConfigSettingEntries; std::vector mLastModifieds; @@ -179,8 +184,9 @@ class bulkUpsertConfigSettingsOperation public: bulkUpsertConfigSettingsOperation( - Database& Db, std::vector const& entryIter) - : mDb(Db) + Database& Db, std::vector const& entryIter, + SessionWrapper& session) + : mDb(Db), mSession(session) { for (auto const& e : entryIter) { @@ -200,7 +206,7 @@ class bulkUpsertConfigSettingsOperation "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mConfigSettingIDs)); st.exchange(soci::use(mConfigSettingEntries)); @@ -245,7 +251,7 @@ class bulkUpsertConfigSettingsOperation "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strConfigSettingIDs)); st.exchange(soci::use(strConfigSettingEntries)); @@ -268,8 +274,9 @@ void LedgerTxnRoot::Impl::bulkUpsertConfigSettings( std::vector const& entries) { - bulkUpsertConfigSettingsOperation op(mApp.getDatabase(), entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + bulkUpsertConfigSettingsOperation op(mApp.getDatabase(), entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -279,16 +286,15 @@ LedgerTxnRoot::Impl::dropConfigSettings(bool rebuild) mEntryCache.clear(); mBestOffers.clear(); - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS configsettings;"; + getSession().session() << "DROP TABLE IF EXISTS configsettings;"; if (rebuild) { std::string coll = mApp.getDatabase().getSimpleCollationClause(); - mApp.getDatabase().getSession() - << "CREATE TABLE configsettings (" - << "configsettingid INT PRIMARY KEY, " - << "ledgerentry TEXT " << coll << " NOT NULL, " - << "lastmodified INT NOT NULL);"; + getSession().session() << "CREATE TABLE configsettings (" + << "configsettingid INT PRIMARY KEY, " + << "ledgerentry TEXT " << coll << " NOT NULL, " + << "lastmodified INT NOT NULL);"; } } } \ No newline at end of file diff --git a/src/ledger/LedgerTxnContractCodeSQL.cpp b/src/ledger/LedgerTxnContractCodeSQL.cpp index 0421e8996c..38fdbfae18 100644 --- a/src/ledger/LedgerTxnContractCodeSQL.cpp +++ b/src/ledger/LedgerTxnContractCodeSQL.cpp @@ -30,7 +30,7 @@ LedgerTxnRoot::Impl::loadContractCode(LedgerKey const& k) const std::string sql = "SELECT ledgerentry " "FROM contractcode " "WHERE hash = :hash"; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::into(contractCodeEntryStr)); st.exchange(soci::use(hash)); @@ -55,6 +55,7 @@ class BulkLoadContractCodeOperation : public DatabaseTypeSpecificOperation> { Database& mDb; + SessionWrapper& mSession; std::vector mHashes; std::vector @@ -85,8 +86,9 @@ class BulkLoadContractCodeOperation public: BulkLoadContractCodeOperation(Database& db, - UnorderedSet const& keys) - : mDb(db) + UnorderedSet const& keys, + SessionWrapper& session) + : mDb(db), mSession(session) { mHashes.reserve(keys.size()); for (auto const& k : keys) @@ -109,7 +111,7 @@ class BulkLoadContractCodeOperation "FROM contractcode " "WHERE hash IN carray(?, ?, 'char*')"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto be = prep.statement().get_backend(); if (be == nullptr) { @@ -137,7 +139,7 @@ class BulkLoadContractCodeOperation "FROM contractcode " "WHERE (hash) IN (SELECT * from r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strHashes)); return executeAndFetch(st); @@ -151,9 +153,11 @@ LedgerTxnRoot::Impl::bulkLoadContractCode( { if (!keys.empty()) { - BulkLoadContractCodeOperation op(mApp.getDatabase(), keys); + BulkLoadContractCodeOperation op(mApp.getDatabase(), keys, + getSession()); return populateLoadedEntries( - keys, mApp.getDatabase().doDatabaseTypeSpecificOperation(op)); + keys, mApp.getDatabase().doDatabaseTypeSpecificOperation( + op, getSession())); } else { @@ -166,12 +170,14 @@ class BulkDeleteContractCodeOperation { Database& mDb; LedgerTxnConsistency mCons; + SessionWrapper& mSession; std::vector mHashes; public: BulkDeleteContractCodeOperation(Database& db, LedgerTxnConsistency cons, - std::vector const& entries) - : mDb(db), mCons(cons) + std::vector const& entries, + SessionWrapper& session) + : mDb(db), mCons(cons), mSession(session) { mHashes.reserve(entries.size()); for (auto const& e : entries) @@ -187,7 +193,7 @@ class BulkDeleteContractCodeOperation doSociGenericOperation() { std::string sql = "DELETE FROM contractcode WHERE hash = :id"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(mHashes)); st.define_and_bind(); @@ -219,7 +225,7 @@ class BulkDeleteContractCodeOperation "DELETE FROM contractcode " "WHERE hash IN (SELECT * FROM r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strHashes)); st.define_and_bind(); @@ -240,8 +246,9 @@ void LedgerTxnRoot::Impl::bulkDeleteContractCode( std::vector const& entries, LedgerTxnConsistency cons) { - BulkDeleteContractCodeOperation op(mApp.getDatabase(), cons, entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkDeleteContractCodeOperation op(mApp.getDatabase(), cons, entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } class BulkUpsertContractCodeOperation @@ -251,6 +258,7 @@ class BulkUpsertContractCodeOperation std::vector mHashes; std::vector mContractCodeEntries; std::vector mLastModifieds; + SessionWrapper& mSession; void accumulateEntry(LedgerEntry const& entry) @@ -265,8 +273,9 @@ class BulkUpsertContractCodeOperation public: BulkUpsertContractCodeOperation(Database& Db, - std::vector const& entryIter) - : mDb(Db) + std::vector const& entryIter, + SessionWrapper& session) + : mDb(Db), mSession(session) { for (auto const& e : entryIter) { @@ -286,7 +295,7 @@ class BulkUpsertContractCodeOperation "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mHashes)); st.exchange(soci::use(mContractCodeEntries)); @@ -329,7 +338,7 @@ class BulkUpsertContractCodeOperation "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strHashes)); st.exchange(soci::use(strContractCodeEntries)); @@ -351,8 +360,9 @@ void LedgerTxnRoot::Impl::bulkUpsertContractCode( std::vector const& entries) { - BulkUpsertContractCodeOperation op(mApp.getDatabase(), entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkUpsertContractCodeOperation op(mApp.getDatabase(), entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -364,21 +374,20 @@ LedgerTxnRoot::Impl::dropContractCode(bool rebuild) std::string coll = mApp.getDatabase().getSimpleCollationClause(); - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS contractcode;"; + getSession().session() << "DROP TABLE IF EXISTS contractcode;"; if (rebuild) { - mApp.getDatabase().getSession() - << "CREATE TABLE contractcode (" - << "hash TEXT " << coll << " NOT NULL, " - << "ledgerentry TEXT " << coll << " NOT NULL, " - << "lastmodified INT NOT NULL, " - << "PRIMARY KEY (hash));"; + getSession().session() << "CREATE TABLE contractcode (" + << "hash TEXT " << coll << " NOT NULL, " + << "ledgerentry TEXT " << coll << " NOT NULL, " + << "lastmodified INT NOT NULL, " + << "PRIMARY KEY (hash));"; if (!mApp.getDatabase().isSqlite()) { - mApp.getDatabase().getSession() << "ALTER TABLE contractcode " - << "ALTER COLUMN hash " - << "TYPE TEXT COLLATE \"C\";"; + getSession().session() << "ALTER TABLE contractcode " + << "ALTER COLUMN hash " + << "TYPE TEXT COLLATE \"C\";"; } } } diff --git a/src/ledger/LedgerTxnContractDataSQL.cpp b/src/ledger/LedgerTxnContractDataSQL.cpp index a7f716a561..ab1f6ea779 100644 --- a/src/ledger/LedgerTxnContractDataSQL.cpp +++ b/src/ledger/LedgerTxnContractDataSQL.cpp @@ -33,7 +33,7 @@ LedgerTxnRoot::Impl::loadContractData(LedgerKey const& k) const "SELECT ledgerentry " "FROM contractdata " "WHERE contractID = :contractID AND key = :key AND type = :type"; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::into(contractDataEntryStr)); st.exchange(soci::use(contractID)); @@ -60,6 +60,7 @@ class BulkLoadContractDataOperation : public DatabaseTypeSpecificOperation> { Database& mDb; + SessionWrapper& mSession; std::vector mContractIDs; std::vector mKeys; std::vector mTypes; @@ -92,8 +93,9 @@ class BulkLoadContractDataOperation public: BulkLoadContractDataOperation(Database& db, - UnorderedSet const& keys) - : mDb(db) + UnorderedSet const& keys, + SessionWrapper& session) + : mDb(db), mSession(session) { mContractIDs.reserve(keys.size()); mKeys.reserve(keys.size()); @@ -142,7 +144,7 @@ class BulkLoadContractDataOperation "FROM contractdata " "WHERE (contractid, key, type) IN (SELECT * FROM r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto be = prep.statement().get_backend(); if (be == nullptr) { @@ -177,7 +179,7 @@ class BulkLoadContractDataOperation "FROM contractdata " "WHERE (contractid, key, type) IN (SELECT * from r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strContractIDs)); st.exchange(soci::use(strKeys)); @@ -193,9 +195,11 @@ LedgerTxnRoot::Impl::bulkLoadContractData( { if (!keys.empty()) { - BulkLoadContractDataOperation op(mApp.getDatabase(), keys); + BulkLoadContractDataOperation op(mApp.getDatabase(), keys, + getSession()); return populateLoadedEntries( - keys, mApp.getDatabase().doDatabaseTypeSpecificOperation(op)); + keys, mApp.getDatabase().doDatabaseTypeSpecificOperation( + op, getSession())); } else { @@ -208,14 +212,16 @@ class BulkDeleteContractDataOperation { Database& mDb; LedgerTxnConsistency mCons; + SessionWrapper& mSession; std::vector mContractIDs; std::vector mKeys; std::vector mTypes; public: BulkDeleteContractDataOperation(Database& db, LedgerTxnConsistency cons, - std::vector const& entries) - : mDb(db), mCons(cons) + std::vector const& entries, + SessionWrapper& session) + : mDb(db), mCons(cons), mSession(session) { mContractIDs.reserve(entries.size()); for (auto const& e : entries) @@ -235,7 +241,7 @@ class BulkDeleteContractDataOperation { std::string sql = "DELETE FROM contractdata WHERE contractid = :id " "AND key = :key AND type = :type"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(mContractIDs)); st.exchange(soci::use(mKeys)); @@ -273,7 +279,7 @@ class BulkDeleteContractDataOperation "DELETE FROM contractdata " "WHERE (contractid, key, type) IN (SELECT * FROM r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strContractIDs)); st.exchange(soci::use(strKeys)); @@ -297,14 +303,16 @@ void LedgerTxnRoot::Impl::bulkDeleteContractData( std::vector const& entries, LedgerTxnConsistency cons) { - BulkDeleteContractDataOperation op(mApp.getDatabase(), cons, entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkDeleteContractDataOperation op(mApp.getDatabase(), cons, entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } class BulkUpsertContractDataOperation : public DatabaseTypeSpecificOperation { Database& mDb; + SessionWrapper& mSession; std::vector mContractIDs; std::vector mKeys; std::vector mTypes; @@ -327,8 +335,9 @@ class BulkUpsertContractDataOperation public: BulkUpsertContractDataOperation(Database& Db, - std::vector const& entryIter) - : mDb(Db) + std::vector const& entryIter, + SessionWrapper& session) + : mDb(Db), mSession(session) { for (auto const& e : entryIter) { @@ -348,7 +357,7 @@ class BulkUpsertContractDataOperation "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mContractIDs)); st.exchange(soci::use(mKeys)); @@ -397,7 +406,7 @@ class BulkUpsertContractDataOperation "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strContractIDs)); st.exchange(soci::use(strKeys)); @@ -421,8 +430,9 @@ void LedgerTxnRoot::Impl::bulkUpsertContractData( std::vector const& entries) { - BulkUpsertContractDataOperation op(mApp.getDatabase(), entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkUpsertContractDataOperation op(mApp.getDatabase(), entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -432,28 +442,27 @@ LedgerTxnRoot::Impl::dropContractData(bool rebuild) mEntryCache.clear(); mBestOffers.clear(); - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS contractdata;"; + getSession().session() << "DROP TABLE IF EXISTS contractdata;"; if (rebuild) { std::string coll = mApp.getDatabase().getSimpleCollationClause(); - mApp.getDatabase().getSession() - << "CREATE TABLE contractdata (" - << "contractid TEXT " << coll << " NOT NULL, " - << "key TEXT " << coll << " NOT NULL, " - << "type INT NOT NULL, " - << "ledgerentry TEXT " << coll << " NOT NULL, " - << "lastmodified INT NOT NULL, " - << "PRIMARY KEY (contractid, key, type));"; + getSession().session() << "CREATE TABLE contractdata (" + << "contractid TEXT " << coll << " NOT NULL, " + << "key TEXT " << coll << " NOT NULL, " + << "type INT NOT NULL, " + << "ledgerentry TEXT " << coll << " NOT NULL, " + << "lastmodified INT NOT NULL, " + << "PRIMARY KEY (contractid, key, type));"; if (!mApp.getDatabase().isSqlite()) { - mApp.getDatabase().getSession() << "ALTER TABLE contractdata " - << "ALTER COLUMN contractid " - << "TYPE TEXT COLLATE \"C\"," - << "ALTER COLUMN key " - << "TYPE TEXT COLLATE \"C\"," - << "ALTER COLUMN type " - << "TYPE INT;"; + getSession().session() << "ALTER TABLE contractdata " + << "ALTER COLUMN contractid " + << "TYPE TEXT COLLATE \"C\"," + << "ALTER COLUMN key " + << "TYPE TEXT COLLATE \"C\"," + << "ALTER COLUMN type " + << "TYPE INT;"; } } } diff --git a/src/ledger/LedgerTxnDataSQL.cpp b/src/ledger/LedgerTxnDataSQL.cpp index a17a38b208..adec74916b 100644 --- a/src/ledger/LedgerTxnDataSQL.cpp +++ b/src/ledger/LedgerTxnDataSQL.cpp @@ -40,7 +40,7 @@ LedgerTxnRoot::Impl::loadData(LedgerKey const& key) const "ledgerext " "FROM accountdata " "WHERE accountid= :id AND dataname= :dataname"; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::into(dataValue, dataValueIndicator)); st.exchange(soci::into(le.lastModifiedLedgerSeq)); @@ -74,6 +74,7 @@ LedgerTxnRoot::Impl::loadData(LedgerKey const& key) const class BulkUpsertDataOperation : public DatabaseTypeSpecificOperation { Database& mDB; + SessionWrapper& mSession; std::vector mAccountIDs; std::vector mDataNames; std::vector mDataValues; @@ -99,8 +100,9 @@ class BulkUpsertDataOperation : public DatabaseTypeSpecificOperation public: BulkUpsertDataOperation(Database& DB, - std::vector const& entries) - : mDB(DB) + std::vector const& entries, + SessionWrapper& session) + : mDB(DB), mSession(session) { for (auto const& e : entries) { @@ -109,8 +111,9 @@ class BulkUpsertDataOperation : public DatabaseTypeSpecificOperation } BulkUpsertDataOperation(Database& DB, - std::vector const& entryIter) - : mDB(DB) + std::vector const& entryIter, + SessionWrapper& session) + : mDB(DB), mSession(session) { for (auto const& e : entryIter) { @@ -135,7 +138,7 @@ class BulkUpsertDataOperation : public DatabaseTypeSpecificOperation "lastmodified = excluded.lastmodified, " "extension = excluded.extension, " "ledgerext = excluded.ledgerext"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mAccountIDs)); st.exchange(soci::use(mDataNames)); @@ -191,7 +194,7 @@ class BulkUpsertDataOperation : public DatabaseTypeSpecificOperation "lastmodified = excluded.lastmodified, " "extension = excluded.extension, " "ledgerext = excluded.ledgerext"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strAccountIDs)); st.exchange(soci::use(strDataNames)); @@ -216,13 +219,15 @@ class BulkDeleteDataOperation : public DatabaseTypeSpecificOperation { Database& mDB; LedgerTxnConsistency mCons; + SessionWrapper& mSession; std::vector mAccountIDs; std::vector mDataNames; public: BulkDeleteDataOperation(Database& DB, LedgerTxnConsistency cons, - std::vector const& entries) - : mDB(DB), mCons(cons) + std::vector const& entries, + SessionWrapper& session) + : mDB(DB), mCons(cons), mSession(session) { for (auto const& e : entries) { @@ -241,7 +246,7 @@ class BulkDeleteDataOperation : public DatabaseTypeSpecificOperation { std::string sql = "DELETE FROM accountdata WHERE accountid = :id AND " " dataname = :v1 "; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mAccountIDs)); st.exchange(soci::use(mDataNames)); @@ -279,7 +284,7 @@ class BulkDeleteDataOperation : public DatabaseTypeSpecificOperation " ) " "DELETE FROM accountdata WHERE (accountid, dataname) IN " "(SELECT * FROM r)"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strAccountIDs)); st.exchange(soci::use(strDataNames)); @@ -303,8 +308,8 @@ LedgerTxnRoot::Impl::bulkUpsertAccountData( { ZoneScoped; ZoneValue(static_cast(entries.size())); - BulkUpsertDataOperation op(mApp.getDatabase(), entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkUpsertDataOperation op(mApp.getDatabase(), entries, getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -313,8 +318,8 @@ LedgerTxnRoot::Impl::bulkDeleteAccountData( { ZoneScoped; ZoneValue(static_cast(entries.size())); - BulkDeleteDataOperation op(mApp.getDatabase(), cons, entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkDeleteDataOperation op(mApp.getDatabase(), cons, entries, getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -324,12 +329,12 @@ LedgerTxnRoot::Impl::dropData(bool rebuild) mEntryCache.clear(); mBestOffers.clear(); - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS accountdata;"; + getSession().session() << "DROP TABLE IF EXISTS accountdata;"; if (rebuild) { std::string coll = mApp.getDatabase().getSimpleCollationClause(); - mApp.getDatabase().getSession() + getSession().session() << "CREATE TABLE accountdata" << "(" << "accountid VARCHAR(56) " << coll << " NOT NULL," @@ -342,12 +347,11 @@ LedgerTxnRoot::Impl::dropData(bool rebuild) ");"; if (!mApp.getDatabase().isSqlite()) { - mApp.getDatabase().getSession() - << "ALTER TABLE accountdata " - << "ALTER COLUMN accountid " - << "TYPE VARCHAR(56) COLLATE \"C\", " - << "ALTER COLUMN dataname " - << "TYPE VARCHAR(88) COLLATE \"C\""; + getSession().session() << "ALTER TABLE accountdata " + << "ALTER COLUMN accountid " + << "TYPE VARCHAR(56) COLLATE \"C\", " + << "ALTER COLUMN dataname " + << "TYPE VARCHAR(88) COLLATE \"C\""; } } } @@ -356,6 +360,7 @@ class BulkLoadDataOperation : public DatabaseTypeSpecificOperation> { Database& mDb; + SessionWrapper& mSession; std::vector mAccountIDs; std::vector mDataNames; @@ -404,8 +409,9 @@ class BulkLoadDataOperation } public: - BulkLoadDataOperation(Database& db, UnorderedSet const& keys) - : mDb(db) + BulkLoadDataOperation(Database& db, UnorderedSet const& keys, + SessionWrapper& session) + : mDb(db), mSession(session) { mAccountIDs.reserve(keys.size()); mDataNames.reserve(keys.size()); @@ -444,7 +450,7 @@ class BulkLoadDataOperation "ledgerext " "FROM accountdata WHERE (accountid, dataname) IN r"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto be = prep.statement().get_backend(); if (be == nullptr) { @@ -479,7 +485,7 @@ class BulkLoadDataOperation "ledgerext " "FROM accountdata WHERE (accountid, dataname) IN (SELECT * FROM r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strAccountIDs)); st.exchange(soci::use(strDataNames)); @@ -495,9 +501,10 @@ LedgerTxnRoot::Impl::bulkLoadData(UnorderedSet const& keys) const ZoneValue(static_cast(keys.size())); if (!keys.empty()) { - BulkLoadDataOperation op(mApp.getDatabase(), keys); + BulkLoadDataOperation op(mApp.getDatabase(), keys, getSession()); return populateLoadedEntries( - keys, mApp.getDatabase().doDatabaseTypeSpecificOperation(op)); + keys, mApp.getDatabase().doDatabaseTypeSpecificOperation( + op, getSession())); } else { diff --git a/src/ledger/LedgerTxnImpl.h b/src/ledger/LedgerTxnImpl.h index 4d71595f70..72ee43f24a 100644 --- a/src/ledger/LedgerTxnImpl.h +++ b/src/ledger/LedgerTxnImpl.h @@ -732,6 +732,8 @@ class LedgerTxnRoot::Impl size_t const mMaxBestOffersBatchSize; Application& mApp; + std::shared_ptr mSession; + std::unique_ptr mHeader; mutable EntryCache mEntryCache; mutable BestOffers mBestOffers; @@ -912,6 +914,7 @@ class LedgerTxnRoot::Impl void dropContractCode(bool rebuild); void dropConfigSettings(bool rebuild); void dropTTL(bool rebuild); + SessionWrapper& getSession() const; #ifdef BUILD_TESTS void resetForFuzzer(); diff --git a/src/ledger/LedgerTxnLiquidityPoolSQL.cpp b/src/ledger/LedgerTxnLiquidityPoolSQL.cpp index ce8289b284..2fcc5dc36d 100644 --- a/src/ledger/LedgerTxnLiquidityPoolSQL.cpp +++ b/src/ledger/LedgerTxnLiquidityPoolSQL.cpp @@ -39,7 +39,7 @@ LedgerTxnRoot::Impl::loadLiquidityPool(LedgerKey const& key) const std::string sql = "SELECT ledgerentry " "FROM liquiditypool " "WHERE poolasset= :poolasset"; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::into(liquidityPoolEntryStr)); st.exchange(soci::use(poolAsset)); @@ -64,6 +64,7 @@ class BulkLoadLiquidityPoolOperation : public DatabaseTypeSpecificOperation> { Database& mDb; + SessionWrapper& mSession; std::vector mPoolAssets; std::vector @@ -94,8 +95,9 @@ class BulkLoadLiquidityPoolOperation public: BulkLoadLiquidityPoolOperation(Database& db, - UnorderedSet const& keys) - : mDb(db) + UnorderedSet const& keys, + SessionWrapper& session) + : mDb(db), mSession(session) { mPoolAssets.reserve(keys.size()); for (auto const& k : keys) @@ -121,7 +123,7 @@ class BulkLoadLiquidityPoolOperation "FROM liquiditypool " "WHERE poolasset IN r"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto be = prep.statement().get_backend(); if (be == nullptr) { @@ -149,7 +151,7 @@ class BulkLoadLiquidityPoolOperation "FROM liquiditypool " "WHERE poolasset IN (SELECT * from r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strPoolAssets)); return executeAndFetch(st); @@ -163,9 +165,11 @@ LedgerTxnRoot::Impl::bulkLoadLiquidityPool( { if (!keys.empty()) { - BulkLoadLiquidityPoolOperation op(mApp.getDatabase(), keys); + BulkLoadLiquidityPoolOperation op(mApp.getDatabase(), keys, + getSession()); return populateLoadedEntries( - keys, mApp.getDatabase().doDatabaseTypeSpecificOperation(op)); + keys, mApp.getDatabase().doDatabaseTypeSpecificOperation( + op, getSession())); } else { @@ -178,12 +182,14 @@ class BulkDeleteLiquidityPoolOperation { Database& mDb; LedgerTxnConsistency mCons; + SessionWrapper& mSession; std::vector mPoolAssets; public: BulkDeleteLiquidityPoolOperation(Database& db, LedgerTxnConsistency cons, - std::vector const& entries) - : mDb(db), mCons(cons) + std::vector const& entries, + SessionWrapper& session) + : mDb(db), mCons(cons), mSession(session) { mPoolAssets.reserve(entries.size()); for (auto const& e : entries) @@ -199,7 +205,7 @@ class BulkDeleteLiquidityPoolOperation doSociGenericOperation() { std::string sql = "DELETE FROM liquiditypool WHERE poolasset = :id"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(mPoolAssets)); st.define_and_bind(); @@ -231,7 +237,7 @@ class BulkDeleteLiquidityPoolOperation "DELETE FROM liquiditypool " "WHERE poolasset IN (SELECT * FROM r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strPoolAssets)); st.define_and_bind(); @@ -252,14 +258,16 @@ void LedgerTxnRoot::Impl::bulkDeleteLiquidityPool( std::vector const& entries, LedgerTxnConsistency cons) { - BulkDeleteLiquidityPoolOperation op(mApp.getDatabase(), cons, entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkDeleteLiquidityPoolOperation op(mApp.getDatabase(), cons, entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } class BulkUpsertLiquidityPoolOperation : public DatabaseTypeSpecificOperation { Database& mDb; + SessionWrapper& mSession; std::vector mPoolAssets; std::vector mAssetAs; std::vector mAssetBs; @@ -283,8 +291,9 @@ class BulkUpsertLiquidityPoolOperation public: BulkUpsertLiquidityPoolOperation( - Database& Db, std::vector const& entryIter) - : mDb(Db) + Database& Db, std::vector const& entryIter, + SessionWrapper& session) + : mDb(Db), mSession(session) { for (auto const& e : entryIter) { @@ -307,7 +316,7 @@ class BulkUpsertLiquidityPoolOperation "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mPoolAssets)); st.exchange(soci::use(mAssetAs)); @@ -359,7 +368,7 @@ class BulkUpsertLiquidityPoolOperation "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strPoolAssets)); st.exchange(soci::use(strAssetAs)); @@ -383,8 +392,9 @@ void LedgerTxnRoot::Impl::bulkUpsertLiquidityPool( std::vector const& entries) { - BulkUpsertLiquidityPoolOperation op(mApp.getDatabase(), entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkUpsertLiquidityPoolOperation op(mApp.getDatabase(), entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -394,7 +404,7 @@ LedgerTxnRoot::Impl::dropLiquidityPools(bool rebuild) mEntryCache.clear(); mBestOffers.clear(); - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS liquiditypool;"; + getSession().session() << "DROP TABLE IF EXISTS liquiditypool;"; if (rebuild) { @@ -403,17 +413,17 @@ LedgerTxnRoot::Impl::dropLiquidityPools(bool rebuild) // containing the PoolID) instead of poolid (the base-64 opaque PoolID) // so that we can perform the join in load pool share trust lines by // account and asset. - mApp.getDatabase().getSession() + getSession().session() << "CREATE TABLE liquiditypool (" << "poolasset TEXT " << coll << " PRIMARY KEY, " << "asseta TEXT " << coll << " NOT NULL, " << "assetb TEXT " << coll << " NOT NULL, " << "ledgerentry TEXT NOT NULL, " << "lastmodified INT NOT NULL);"; - mApp.getDatabase().getSession() << "CREATE INDEX liquiditypoolasseta " - << "ON liquiditypool(asseta);"; - mApp.getDatabase().getSession() << "CREATE INDEX liquiditypoolassetb " - << "ON liquiditypool(assetb);"; + getSession().session() << "CREATE INDEX liquiditypoolasseta " + << "ON liquiditypool(asseta);"; + getSession().session() << "CREATE INDEX liquiditypoolassetb " + << "ON liquiditypool(assetb);"; } } } diff --git a/src/ledger/LedgerTxnOfferSQL.cpp b/src/ledger/LedgerTxnOfferSQL.cpp index 6481bae9f7..bbf3cc842e 100644 --- a/src/ledger/LedgerTxnOfferSQL.cpp +++ b/src/ledger/LedgerTxnOfferSQL.cpp @@ -39,7 +39,7 @@ LedgerTxnRoot::Impl::loadOffer(LedgerKey const& key) const "ledgerext " "FROM offers " "WHERE sellerid= :id AND offerid= :offerid"; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::use(actIDStrKey)); st.exchange(soci::use(offerID)); @@ -61,7 +61,7 @@ LedgerTxnRoot::Impl::loadAllOffers() const std::string sql = "SELECT sellerid, offerid, sellingasset, buyingasset, " "amount, pricen, priced, flags, lastmodified, extension, " "ledgerext FROM offers"; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); std::vector offers; { @@ -89,7 +89,7 @@ LedgerTxnRoot::Impl::loadBestOffers(std::deque& offers, buyingAsset = decoder::encode_b64(xdr::xdr_to_opaque(buying)); sellingAsset = decoder::encode_b64(xdr::xdr_to_opaque(selling)); - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::use(sellingAsset)); st.exchange(soci::use(buyingAsset)); @@ -145,7 +145,7 @@ LedgerTxnRoot::Impl::loadBestOffers(std::deque& offers, (double)worseThan.price.n / (double)worseThan.price.d; int64_t worseThanOfferID = worseThan.offerID + 1; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::use(sellingAsset)); st.exchange(soci::use(buyingAsset)); @@ -227,7 +227,7 @@ LedgerTxnRoot::Impl::loadOffersByAccountAndAsset(AccountID const& accountID, } std::string assetStr = decoder::encode_b64(xdr::xdr_to_opaque(asset)); - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::use(accountStr)); st.exchange(soci::use(assetStr)); @@ -327,6 +327,7 @@ LedgerTxnRoot::Impl::loadOffers(StatementContext& prep) const class BulkUpsertOffersOperation : public DatabaseTypeSpecificOperation { Database& mDB; + SessionWrapper& mSession; std::vector mSellerIDs; std::vector mOfferIDs; std::vector mSellingAssets; @@ -371,8 +372,9 @@ class BulkUpsertOffersOperation : public DatabaseTypeSpecificOperation public: BulkUpsertOffersOperation(Database& DB, - std::vector const& entries) - : mDB(DB) + std::vector const& entries, + SessionWrapper& session) + : mDB(DB), mSession(session) { mSellerIDs.reserve(entries.size()); mOfferIDs.reserve(entries.size()); @@ -394,8 +396,9 @@ class BulkUpsertOffersOperation : public DatabaseTypeSpecificOperation } BulkUpsertOffersOperation(Database& DB, - std::vector const& entries) - : mDB(DB) + std::vector const& entries, + SessionWrapper& session) + : mDB(DB), mSession(session) { mSellerIDs.reserve(entries.size()); mOfferIDs.reserve(entries.size()); @@ -441,7 +444,7 @@ class BulkUpsertOffersOperation : public DatabaseTypeSpecificOperation "lastmodified = excluded.lastmodified, " "extension = excluded.extension, " "ledgerext = excluded.ledgerext"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mSellerIDs)); st.exchange(soci::use(mOfferIDs)); @@ -529,7 +532,7 @@ class BulkUpsertOffersOperation : public DatabaseTypeSpecificOperation "lastmodified = excluded.lastmodified, " "extension = excluded.extension, " "ledgerext = excluded.ledgerext"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strSellerIDs)); st.exchange(soci::use(strOfferIDs)); @@ -560,12 +563,14 @@ class BulkDeleteOffersOperation : public DatabaseTypeSpecificOperation { Database& mDB; LedgerTxnConsistency mCons; + SessionWrapper& mSession; std::vector mOfferIDs; public: BulkDeleteOffersOperation(Database& DB, LedgerTxnConsistency cons, - std::vector const& entries) - : mDB(DB), mCons(cons) + std::vector const& entries, + SessionWrapper& session) + : mDB(DB), mCons(cons), mSession(session) { for (auto const& e : entries) { @@ -582,7 +587,7 @@ class BulkDeleteOffersOperation : public DatabaseTypeSpecificOperation doSociGenericOperation() { std::string sql = "DELETE FROM offers WHERE offerid = :id"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mOfferIDs)); st.define_and_bind(); @@ -615,7 +620,7 @@ class BulkDeleteOffersOperation : public DatabaseTypeSpecificOperation ") " "DELETE FROM offers WHERE " "offerid IN (SELECT * FROM r)"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strOfferIDs)); st.define_and_bind(); @@ -637,8 +642,8 @@ LedgerTxnRoot::Impl::bulkUpsertOffers(std::vector const& entries) { ZoneScoped; ZoneValue(static_cast(entries.size())); - BulkUpsertOffersOperation op(mApp.getDatabase(), entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkUpsertOffersOperation op(mApp.getDatabase(), entries, getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -647,8 +652,9 @@ LedgerTxnRoot::Impl::bulkDeleteOffers(std::vector const& entries, { ZoneScoped; ZoneValue(static_cast(entries.size())); - BulkDeleteOffersOperation op(mApp.getDatabase(), cons, entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkDeleteOffersOperation op(mApp.getDatabase(), cons, entries, + getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -658,12 +664,12 @@ LedgerTxnRoot::Impl::dropOffers(bool rebuild) mEntryCache.clear(); mBestOffers.clear(); - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS offers;"; + getSession().session() << "DROP TABLE IF EXISTS offers;"; if (rebuild) { std::string coll = mApp.getDatabase().getSimpleCollationClause(); - mApp.getDatabase().getSession() + getSession().session() << "CREATE TABLE offers" << "(" << "sellerid VARCHAR(56) " << coll << "NOT NULL," @@ -681,22 +687,19 @@ LedgerTxnRoot::Impl::dropOffers(bool rebuild) "ledgerext TEXT NOT NULL," "PRIMARY KEY (offerid)" ");"; - mApp.getDatabase().getSession() - << "CREATE INDEX bestofferindex ON offers " - "(sellingasset,buyingasset,price,offerid);"; - mApp.getDatabase().getSession() - << "CREATE INDEX offerbyseller ON offers " - "(sellerid);"; + getSession().session() << "CREATE INDEX bestofferindex ON offers " + "(sellingasset,buyingasset,price,offerid);"; + getSession().session() << "CREATE INDEX offerbyseller ON offers " + "(sellerid);"; if (!mApp.getDatabase().isSqlite()) { - mApp.getDatabase().getSession() - << "ALTER TABLE offers " - << "ALTER COLUMN sellerid " - << "TYPE VARCHAR(56) COLLATE \"C\", " - << "ALTER COLUMN buyingasset " - << "TYPE TEXT COLLATE \"C\", " - << "ALTER COLUMN sellingasset " - << "TYPE TEXT COLLATE \"C\""; + getSession().session() << "ALTER TABLE offers " + << "ALTER COLUMN sellerid " + << "TYPE VARCHAR(56) COLLATE \"C\", " + << "ALTER COLUMN buyingasset " + << "TYPE TEXT COLLATE \"C\", " + << "ALTER COLUMN sellingasset " + << "TYPE TEXT COLLATE \"C\""; } } } @@ -705,6 +708,7 @@ class BulkLoadOffersOperation : public DatabaseTypeSpecificOperation> { Database& mDb; + SessionWrapper& mSession; std::vector mOfferIDs; UnorderedSet mKeys; @@ -767,8 +771,9 @@ class BulkLoadOffersOperation } public: - BulkLoadOffersOperation(Database& db, UnorderedSet const& keys) - : mDb(db) + BulkLoadOffersOperation(Database& db, UnorderedSet const& keys, + SessionWrapper& session) + : mDb(db), mSession(session) { mOfferIDs.reserve(keys.size()); for (auto const& k : keys) @@ -790,7 +795,7 @@ class BulkLoadOffersOperation "ledgerext " "FROM offers WHERE offerid IN carray(?, ?, 'int64')"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto be = prep.statement().get_backend(); if (be == nullptr) { @@ -819,7 +824,7 @@ class BulkLoadOffersOperation "amount, pricen, priced, flags, lastmodified, extension, " "ledgerext " "FROM offers WHERE offerid IN (SELECT * FROM r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strOfferIDs)); return executeAndFetch(st); @@ -834,9 +839,10 @@ LedgerTxnRoot::Impl::bulkLoadOffers(UnorderedSet const& keys) const ZoneValue(static_cast(keys.size())); if (!keys.empty()) { - BulkLoadOffersOperation op(mApp.getDatabase(), keys); + BulkLoadOffersOperation op(mApp.getDatabase(), keys, getSession()); return populateLoadedEntries( - keys, mApp.getDatabase().doDatabaseTypeSpecificOperation(op)); + keys, mApp.getDatabase().doDatabaseTypeSpecificOperation( + op, getSession())); } else { diff --git a/src/ledger/LedgerTxnTTLSQL.cpp b/src/ledger/LedgerTxnTTLSQL.cpp index 363923a14d..b270cd85c6 100644 --- a/src/ledger/LedgerTxnTTLSQL.cpp +++ b/src/ledger/LedgerTxnTTLSQL.cpp @@ -31,7 +31,7 @@ LedgerTxnRoot::Impl::loadTTL(LedgerKey const& key) const std::string sql = "SELECT ledgerentry " "FROM ttl " "WHERE keyhash = :keyHash"; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement(sql, getSession()); auto& st = prep.statement(); st.exchange(soci::into(ttlEntryStr)); st.exchange(soci::use(keyHash)); @@ -56,6 +56,7 @@ class BulkLoadTTLOperation { Database& mDb; std::vector mKeyHashes; + SessionWrapper& mSession; std::vector executeAndFetch(soci::statement& st) @@ -84,8 +85,9 @@ class BulkLoadTTLOperation } public: - BulkLoadTTLOperation(Database& db, UnorderedSet const& keys) - : mDb(db) + BulkLoadTTLOperation(Database& db, UnorderedSet const& keys, + SessionWrapper& session) + : mDb(db), mSession(session) { mKeyHashes.reserve(keys.size()); for (auto const& k : keys) @@ -108,7 +110,7 @@ class BulkLoadTTLOperation "FROM ttl " "WHERE keyhash IN carray(?, ?, 'char*')"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto be = prep.statement().get_backend(); if (be == nullptr) { @@ -136,7 +138,7 @@ class BulkLoadTTLOperation "FROM ttl " "WHERE (keyHash) IN (SELECT * from r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strKeyHashes)); return executeAndFetch(st); @@ -149,9 +151,10 @@ LedgerTxnRoot::Impl::bulkLoadTTL(UnorderedSet const& keys) const { if (!keys.empty()) { - BulkLoadTTLOperation op(mApp.getDatabase(), keys); + BulkLoadTTLOperation op(mApp.getDatabase(), keys, getSession()); return populateLoadedEntries( - keys, mApp.getDatabase().doDatabaseTypeSpecificOperation(op)); + keys, mApp.getDatabase().doDatabaseTypeSpecificOperation( + op, getSession())); } else { @@ -164,11 +167,13 @@ class BulkDeleteTTLOperation : public DatabaseTypeSpecificOperation Database& mDb; LedgerTxnConsistency mCons; std::vector mKeyHashes; + SessionWrapper& mSession; public: BulkDeleteTTLOperation(Database& db, LedgerTxnConsistency cons, - std::vector const& entries) - : mDb(db), mCons(cons) + std::vector const& entries, + SessionWrapper& session) + : mDb(db), mCons(cons), mSession(session) { mKeyHashes.reserve(entries.size()); for (auto const& e : entries) @@ -184,7 +189,7 @@ class BulkDeleteTTLOperation : public DatabaseTypeSpecificOperation doSociGenericOperation() { std::string sql = "DELETE FROM ttl WHERE keyhash = :id"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(mKeyHashes)); st.define_and_bind(); @@ -216,7 +221,7 @@ class BulkDeleteTTLOperation : public DatabaseTypeSpecificOperation "DELETE FROM ttl " "WHERE keyHash IN (SELECT * FROM r)"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto& st = prep.statement(); st.exchange(soci::use(strKeyHashes)); st.define_and_bind(); @@ -237,13 +242,14 @@ void LedgerTxnRoot::Impl::bulkDeleteTTL(std::vector const& entries, LedgerTxnConsistency cons) { - BulkDeleteTTLOperation op(mApp.getDatabase(), cons, entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkDeleteTTLOperation op(mApp.getDatabase(), cons, entries, getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } class BulkUpsertTTLOperation : public DatabaseTypeSpecificOperation { Database& mDb; + SessionWrapper& mSession; std::vector mKeyHashes; std::vector mTTLEntries; std::vector mLastModifieds; @@ -261,8 +267,9 @@ class BulkUpsertTTLOperation : public DatabaseTypeSpecificOperation public: BulkUpsertTTLOperation(Database& Db, - std::vector const& entryIter) - : mDb(Db) + std::vector const& entryIter, + SessionWrapper& session) + : mDb(Db), mSession(session) { for (auto const& e : entryIter) { @@ -282,7 +289,7 @@ class BulkUpsertTTLOperation : public DatabaseTypeSpecificOperation "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mKeyHashes)); st.exchange(soci::use(mTTLEntries)); @@ -325,7 +332,7 @@ class BulkUpsertTTLOperation : public DatabaseTypeSpecificOperation "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strKeyHashes)); st.exchange(soci::use(strTTLEntries)); @@ -346,8 +353,8 @@ class BulkUpsertTTLOperation : public DatabaseTypeSpecificOperation void LedgerTxnRoot::Impl::bulkUpsertTTL(std::vector const& entries) { - BulkUpsertTTLOperation op(mApp.getDatabase(), entries); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + BulkUpsertTTLOperation op(mApp.getDatabase(), entries, getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -359,11 +366,11 @@ LedgerTxnRoot::Impl::dropTTL(bool rebuild) std::string coll = mApp.getDatabase().getSimpleCollationClause(); - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS ttl;"; + mApp.getDatabase().getRawSession() << "DROP TABLE IF EXISTS ttl;"; if (rebuild) { - mApp.getDatabase().getSession() + mApp.getDatabase().getRawSession() << "CREATE TABLE ttl (" << "keyhash TEXT " << coll << " NOT NULL, " << "ledgerentry TEXT " << coll << " NOT NULL, " @@ -371,9 +378,9 @@ LedgerTxnRoot::Impl::dropTTL(bool rebuild) << "PRIMARY KEY (keyhash));"; if (!mApp.getDatabase().isSqlite()) { - mApp.getDatabase().getSession() << "ALTER TABLE ttl " - << "ALTER COLUMN keyhash " - << "TYPE TEXT COLLATE \"C\";"; + mApp.getDatabase().getRawSession() << "ALTER TABLE ttl " + << "ALTER COLUMN keyhash " + << "TYPE TEXT COLLATE \"C\";"; } } } diff --git a/src/ledger/LedgerTxnTrustLineSQL.cpp b/src/ledger/LedgerTxnTrustLineSQL.cpp index 78631cd25a..0375c20861 100644 --- a/src/ledger/LedgerTxnTrustLineSQL.cpp +++ b/src/ledger/LedgerTxnTrustLineSQL.cpp @@ -53,7 +53,8 @@ LedgerTxnRoot::Impl::loadTrustLine(LedgerKey const& key) const auto prep = mApp.getDatabase().getPreparedStatement( "SELECT ledgerentry " " FROM trustlines " - "WHERE accountid= :id AND asset= :asset"); + "WHERE accountid= :id AND asset= :asset", + getSession()); auto& st = prep.statement(); st.exchange(soci::into(trustLineEntryStr)); st.exchange(soci::use(accountIDStr)); @@ -95,7 +96,8 @@ LedgerTxnRoot::Impl::loadPoolShareTrustLinesByAccountAndAsset( "INNER JOIN liquiditypool " "ON trustlines.asset = liquiditypool.poolasset " "AND trustlines.accountid = :v1 " - "AND (liquiditypool.asseta = :v2 OR liquiditypool.assetb = :v3)"); + "AND (liquiditypool.asseta = :v2 OR liquiditypool.assetb = :v3)", + getSession()); auto& st = prep.statement(); st.exchange(soci::into(trustLineEntryStr)); st.exchange(soci::use(accountIDStr)); @@ -128,12 +130,14 @@ class BulkUpsertTrustLinesOperation : public DatabaseTypeSpecificOperation std::vector mAssets; std::vector mTrustLineEntries; std::vector mLastModifieds; + SessionWrapper& mSession; public: BulkUpsertTrustLinesOperation(Database& DB, std::vector const& entries, - uint32_t ledgerVersion) - : mDB(DB) + uint32_t ledgerVersion, + SessionWrapper& session) + : mDB(DB), mSession(session) { mAccountIDs.reserve(entries.size()); mAssets.reserve(entries.size()); @@ -170,7 +174,7 @@ class BulkUpsertTrustLinesOperation : public DatabaseTypeSpecificOperation ") ON CONFLICT (accountid, asset) DO UPDATE SET " "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mAccountIDs)); st.exchange(soci::use(mAssets)); @@ -218,7 +222,7 @@ class BulkUpsertTrustLinesOperation : public DatabaseTypeSpecificOperation "ON CONFLICT (accountid, asset) DO UPDATE SET " "ledgerentry = excluded.ledgerentry, " "lastmodified = excluded.lastmodified"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strAccountIDs)); st.exchange(soci::use(strAssets)); @@ -243,12 +247,14 @@ class BulkDeleteTrustLinesOperation : public DatabaseTypeSpecificOperation LedgerTxnConsistency mCons; std::vector mAccountIDs; std::vector mAssets; + SessionWrapper& mSession; public: BulkDeleteTrustLinesOperation(Database& DB, LedgerTxnConsistency cons, std::vector const& entries, - uint32_t ledgerVersion) - : mDB(DB), mCons(cons) + uint32_t ledgerVersion, + SessionWrapper& session) + : mDB(DB), mCons(cons), mSession(session) { mAccountIDs.reserve(entries.size()); mAssets.reserve(entries.size()); @@ -272,7 +278,7 @@ class BulkDeleteTrustLinesOperation : public DatabaseTypeSpecificOperation { std::string sql = "DELETE FROM trustlines WHERE accountid = :id " "AND asset = :v1"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(mAccountIDs)); st.exchange(soci::use(mAssets)); @@ -308,7 +314,7 @@ class BulkDeleteTrustLinesOperation : public DatabaseTypeSpecificOperation ") " "DELETE FROM trustlines WHERE " "(accountid, asset) IN (SELECT * FROM r)"; - auto prep = mDB.getPreparedStatement(sql); + auto prep = mDB.getPreparedStatement(sql, mSession); soci::statement& st = prep.statement(); st.exchange(soci::use(strAccountIDs)); st.exchange(soci::use(strAssets)); @@ -333,8 +339,8 @@ LedgerTxnRoot::Impl::bulkUpsertTrustLines( ZoneScoped; ZoneValue(static_cast(entries.size())); BulkUpsertTrustLinesOperation op(mApp.getDatabase(), entries, - mHeader->ledgerVersion); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + mHeader->ledgerVersion, getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -344,8 +350,8 @@ LedgerTxnRoot::Impl::bulkDeleteTrustLines( ZoneScoped; ZoneValue(static_cast(entries.size())); BulkDeleteTrustLinesOperation op(mApp.getDatabase(), cons, entries, - mHeader->ledgerVersion); - mApp.getDatabase().doDatabaseTypeSpecificOperation(op); + mHeader->ledgerVersion, getSession()); + mApp.getDatabase().doDatabaseTypeSpecificOperation(op, getSession()); } void @@ -355,12 +361,12 @@ LedgerTxnRoot::Impl::dropTrustLines(bool rebuild) mEntryCache.clear(); mBestOffers.clear(); - mApp.getDatabase().getSession() << "DROP TABLE IF EXISTS trustlines;"; + mApp.getDatabase().getRawSession() << "DROP TABLE IF EXISTS trustlines;"; if (rebuild) { std::string coll = mApp.getDatabase().getSimpleCollationClause(); - mApp.getDatabase().getSession() + mApp.getDatabase().getRawSession() << "CREATE TABLE trustlines" << "(" << "accountid VARCHAR(56) " << coll << " NOT NULL," @@ -377,6 +383,7 @@ class BulkLoadTrustLinesOperation Database& mDb; std::vector mAccountIDs; std::vector mAssets; + SessionWrapper& mSession; std::vector executeAndFetch(soci::statement& st) @@ -410,8 +417,9 @@ class BulkLoadTrustLinesOperation public: BulkLoadTrustLinesOperation(Database& db, - UnorderedSet const& keys) - : mDb(db) + UnorderedSet const& keys, + SessionWrapper& session) + : mDb(db), mSession(session) { mAccountIDs.reserve(keys.size()); mAssets.reserve(keys.size()); @@ -457,7 +465,7 @@ class BulkLoadTrustLinesOperation ") SELECT accountid, asset, ledgerentry " "FROM trustlines WHERE (accountid, asset) IN r"; - auto prep = mDb.getPreparedStatement(sql); + auto prep = mDb.getPreparedStatement(sql, mSession); auto be = prep.statement().get_backend(); if (be == nullptr) { @@ -492,7 +500,8 @@ class BulkLoadTrustLinesOperation "ledgerentry " " FROM trustlines " "WHERE (accountid, asset) IN (SELECT * " - "FROM r)"); + "FROM r)", + mSession); auto& st = prep.statement(); st.exchange(soci::use(strAccountIDs)); st.exchange(soci::use(strAssets)); @@ -509,9 +518,10 @@ LedgerTxnRoot::Impl::bulkLoadTrustLines( ZoneValue(static_cast(keys.size())); if (!keys.empty()) { - BulkLoadTrustLinesOperation op(mApp.getDatabase(), keys); + BulkLoadTrustLinesOperation op(mApp.getDatabase(), keys, getSession()); return populateLoadedEntries( - keys, mApp.getDatabase().doDatabaseTypeSpecificOperation(op)); + keys, mApp.getDatabase().doDatabaseTypeSpecificOperation( + op, getSession())); } else { diff --git a/src/main/Application.cpp b/src/main/Application.cpp index 991c11f48a..8ff31b3b4f 100644 --- a/src/main/Application.cpp +++ b/src/main/Application.cpp @@ -4,6 +4,7 @@ #include "Application.h" #include "ApplicationImpl.h" +#include "database/Database.h" #include "util/GlobalChecks.h" #include @@ -21,12 +22,13 @@ validateNetworkPassphrase(Application::pointer app) } auto& persistentState = app->getPersistentState(); - std::string prevNetworkPassphrase = - persistentState.getState(PersistentState::kNetworkPassphrase); + std::string prevNetworkPassphrase = persistentState.getState( + PersistentState::kNetworkPassphrase, app->getDatabase().getSession()); if (prevNetworkPassphrase.empty()) { persistentState.setState(PersistentState::kNetworkPassphrase, - networkPassphrase); + networkPassphrase, + app->getDatabase().getSession()); } else if (networkPassphrase != prevNetworkPassphrase) { diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index 639cdb50d8..10617f51d3 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -214,7 +214,7 @@ maybeRebuildLedger(Application& app, bool applyBuckets) if (!app.getConfig().MODE_USES_IN_MEMORY_LEDGER) { app.getDatabase().clearPreparedStatementCache(); - soci::transaction tx(app.getDatabase().getSession()); + soci::transaction tx(app.getDatabase().getRawSession()); auto loopEntries = [&](auto const& entryTypeSet, bool shouldRebuild) { for (auto let : entryTypeSet) @@ -772,7 +772,8 @@ ApplicationImpl::validateAndLogConfig() if (mConfig.DEPRECATED_SQL_LEDGER_STATE) { - if (mPersistentState->getState(PersistentState::kDBBackend) == + if (mPersistentState->getState(PersistentState::kDBBackend, + getDatabase().getSession()) == BucketIndex::DB_BACKEND_STATE) { throw std::invalid_argument( @@ -790,7 +791,8 @@ ApplicationImpl::validateAndLogConfig() if (mConfig.isUsingBucketListDB()) { mPersistentState->setState(PersistentState::kDBBackend, - BucketIndex::DB_BACKEND_STATE); + BucketIndex::DB_BACKEND_STATE, + getDatabase().getSession()); auto pageSizeExp = mConfig.BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT; if (pageSizeExp != 0) { diff --git a/src/main/ApplicationUtils.cpp b/src/main/ApplicationUtils.cpp index 5f33fce2a5..376012804f 100644 --- a/src/main/ApplicationUtils.cpp +++ b/src/main/ApplicationUtils.cpp @@ -318,8 +318,8 @@ applyBucketsForLCL(Application& app, std::function onlyApply) { auto has = app.getLedgerManager().getLastClosedLedgerHAS(); - auto lclHash = - app.getPersistentState().getState(PersistentState::kLastClosedLedger); + auto lclHash = app.getPersistentState().getState( + PersistentState::kLastClosedLedger, app.getDatabase().getSession()); auto maxProtocolVersion = app.getConfig().LEDGER_PROTOCOL_VERSION; auto currentLedger = diff --git a/src/main/ExternalQueue.cpp b/src/main/ExternalQueue.cpp index 61578971cb..511b38f95a 100644 --- a/src/main/ExternalQueue.cpp +++ b/src/main/ExternalQueue.cpp @@ -31,9 +31,9 @@ ExternalQueue::ExternalQueue(Application& app) : mApp(app) void ExternalQueue::dropAll(Database& db) { - db.getSession() << "DROP TABLE IF EXISTS pubsub;"; + db.getRawMiscSession() << "DROP TABLE IF EXISTS pubsub;"; - soci::statement st = db.getSession().prepare << kSQLCreateStatement; + soci::statement st = db.getRawMiscSession().prepare << kSQLCreateStatement; st.execute(true); } @@ -74,7 +74,8 @@ ExternalQueue::setCursorForResource(std::string const& resid, uint32 cursor) { ZoneNamedN(insertPubsubZone, "insert pubsub", true); auto prep = mApp.getDatabase().getPreparedStatement( - "INSERT INTO pubsub (resid, lastread) VALUES (:n, :v);"); + "INSERT INTO pubsub (resid, lastread) VALUES (:n, :v);", + mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(soci::use(resid)); st.exchange(soci::use(cursor)); @@ -88,7 +89,8 @@ ExternalQueue::setCursorForResource(std::string const& resid, uint32 cursor) else { auto prep = mApp.getDatabase().getPreparedStatement( - "UPDATE pubsub SET lastread = :v WHERE resid = :n;"); + "UPDATE pubsub SET lastread = :v WHERE resid = :n;", + mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(soci::use(cursor)); @@ -114,7 +116,8 @@ ExternalQueue::getCursorForResource(std::string const& resid, auto& db = mApp.getDatabase(); auto prep = - db.getPreparedStatement("SELECT resid, lastread FROM pubsub;"); + db.getPreparedStatement("SELECT resid, lastread FROM pubsub;", + mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(soci::into(n)); st.exchange(soci::into(v)); @@ -151,7 +154,8 @@ ExternalQueue::deleteCursor(std::string const& resid) { ZoneNamedN(deletePubsubZone, "delete pubsub", true); auto prep = mApp.getDatabase().getPreparedStatement( - "DELETE FROM pubsub WHERE resid = :n;"); + "DELETE FROM pubsub WHERE resid = :n;", + mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(soci::use(resid)); st.define_and_bind(); @@ -166,9 +170,9 @@ ExternalQueue::deleteOldEntries(uint32 count) auto& db = mApp.getDatabase(); int m; soci::indicator minIndicator; - soci::statement st = - (db.getSession().prepare << "SELECT MIN(lastread) FROM pubsub", - soci::into(m, minIndicator)); + soci::statement st = (db.getMiscSession().session().prepare + << "SELECT MIN(lastread) FROM pubsub", + soci::into(m, minIndicator)); { ZoneNamedN(selectPubsubZone, "select pubsub", true); st.execute(true); @@ -224,8 +228,9 @@ ExternalQueue::getCursor(std::string const& resid) std::string res; auto& db = mApp.getDatabase(); - auto prep = db.getPreparedStatement( - "SELECT lastread FROM pubsub WHERE resid = :n;"); + auto prep = + db.getPreparedStatement("SELECT lastread FROM pubsub WHERE resid = :n;", + mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(soci::into(res)); st.exchange(soci::use(resid)); diff --git a/src/main/PersistentState.cpp b/src/main/PersistentState.cpp index 150d3f62ab..efe84acbf2 100644 --- a/src/main/PersistentState.cpp +++ b/src/main/PersistentState.cpp @@ -18,11 +18,13 @@ namespace stellar using namespace std; -std::string PersistentState::mapping[kLastEntry] = { - "lastclosedledger", "historyarchivestate", "lastscpdata", - "databaseschema", "networkpassphrase", "ledgerupgrades", - "rebuildledger", "lastscpdataxdr", "txset", - "dbbackend"}; +std::string PersistentState::mainMapping[kLastEntryMain] = { + "lastclosedledger", "historyarchivestate", "databaseschema", + "networkpassphrase", "rebuildledger", "dbbackend"}; + +std::string PersistentState::miscMapping[kLastEntry] = { + "lastscpdata", "ledgerupgrades", "lastscpdataxdr", "txset", + "miscdatabaseschema"}; std::string PersistentState::kSQLCreateStatement = "CREATE TABLE IF NOT EXISTS storestate (" @@ -37,12 +39,13 @@ PersistentState::PersistentState(Application& app) : mApp(app) void PersistentState::deleteTxSets(std::unordered_set hashesToDelete) { - soci::transaction tx(mApp.getDatabase().getSession()); + soci::transaction tx(mApp.getDatabase().getRawMiscSession()); for (auto const& hash : hashesToDelete) { auto name = getStoreStateNameForTxSet(hash); auto prep = mApp.getDatabase().getPreparedStatement( - "DELETE FROM storestate WHERE statename = :n;"); + "DELETE FROM storestate WHERE statename = :n;", + mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(soci::use(name)); @@ -55,20 +58,46 @@ PersistentState::deleteTxSets(std::unordered_set hashesToDelete) void PersistentState::dropAll(Database& db) { - db.getSession() << "DROP TABLE IF EXISTS storestate;"; + auto drop = [](SessionWrapper& session) { + session.session() << "DROP TABLE IF EXISTS storestate;"; + soci::statement st = session.session().prepare << kSQLCreateStatement; + st.execute(true); + }; + + drop(db.getSession()); + if (db.canUsePool() && db.isSqlite()) + { + drop(db.getMiscSession()); + } +} - soci::statement st = db.getSession().prepare << kSQLCreateStatement; +void +PersistentState::dropMisc(Database& db) +{ + db.getRawMiscSession() << "DROP TABLE IF EXISTS storestate;"; + soci::statement st = db.getRawMiscSession().prepare << kSQLCreateStatement; st.execute(true); } std::string -PersistentState::getStoreStateName(PersistentState::Entry n, uint32 subscript) +PersistentState::getStoreStateName(PersistentState::Entry n, + uint32 subscript) const { if (n < 0 || n >= kLastEntry) { throw out_of_range("unknown entry"); } - auto res = mapping[n]; + + std::string res; + if (n < kLastEntryMain) + { + res = mainMapping[n]; + } + else + { + res = miscMapping[n - kLastEntryMain - 1]; + } + if (((n == kLastSCPData || n == kLastSCPDataXDR) && subscript > 0) || n == kRebuildLedger) { @@ -80,7 +109,7 @@ PersistentState::getStoreStateName(PersistentState::Entry n, uint32 subscript) std::string PersistentState::getStoreStateNameForTxSet(Hash const& txSetHash) { - auto res = mapping[kTxSet]; + auto res = miscMapping[kTxSet - kLastEntryMain - 1]; res += binToHex(txSetHash); return res; } @@ -88,22 +117,23 @@ PersistentState::getStoreStateNameForTxSet(Hash const& txSetHash) bool PersistentState::hasTxSet(Hash const& txSetHash) { - return entryExists(getStoreStateNameForTxSet(txSetHash)); + return entryExists(getStoreStateNameForTxSet(txSetHash), + mApp.getDatabase().getMiscSession()); } std::string -PersistentState::getState(PersistentState::Entry entry) +PersistentState::getState(PersistentState::Entry entry, SessionWrapper& session) { ZoneScoped; - return getFromDb(getStoreStateName(entry)); + return getFromDb(getStoreStateName(entry), session); } void PersistentState::setState(PersistentState::Entry entry, - std::string const& value) + std::string const& value, SessionWrapper& session) { ZoneScoped; - updateDb(getStoreStateName(entry), value); + updateDb(getStoreStateName(entry), value, session); } std::vector @@ -114,7 +144,8 @@ PersistentState::getSCPStateAllSlots() std::vector states; for (uint32 i = 0; i <= mApp.getConfig().MAX_SLOTS_TO_REMEMBER; i++) { - auto val = getFromDb(getStoreStateName(kLastSCPDataXDR, i)); + auto val = getFromDb(getStoreStateName(kLastSCPDataXDR, i), + mApp.getDatabase().getMiscSession()); if (!val.empty()) { states.push_back(val); @@ -130,7 +161,8 @@ PersistentState::setSCPStateForSlot(uint64 slot, std::string const& value) ZoneScoped; auto slotIdx = static_cast( slot % (mApp.getConfig().MAX_SLOTS_TO_REMEMBER + 1)); - updateDb(getStoreStateName(kLastSCPDataXDR, slotIdx), value); + updateDb(getStoreStateName(kLastSCPDataXDR, slotIdx), value, + mApp.getDatabase().getMiscSession()); } void @@ -138,12 +170,13 @@ PersistentState::setSCPStateV1ForSlot( uint64 slot, std::string const& value, std::unordered_map const& txSets) { - soci::transaction tx(mApp.getDatabase().getSession()); + soci::transaction tx(mApp.getDatabase().getRawMiscSession()); setSCPStateForSlot(slot, value); for (auto const& txSet : txSets) { - updateDb(getStoreStateNameForTxSet(txSet.first), txSet.second); + updateDb(getStoreStateNameForTxSet(txSet.first), txSet.second, + mApp.getDatabase().getMiscSession()); } tx.commit(); } @@ -152,14 +185,17 @@ bool PersistentState::shouldRebuildForType(LedgerEntryType let) { ZoneScoped; - return !getFromDb(getStoreStateName(kRebuildLedger, let)).empty(); + return !getFromDb(getStoreStateName(kRebuildLedger, let), + mApp.getDatabase().getSession()) + .empty(); } void PersistentState::clearRebuildForType(LedgerEntryType let) { ZoneScoped; - updateDb(getStoreStateName(kRebuildLedger, let), ""); + updateDb(getStoreStateName(kRebuildLedger, let), "", + mApp.getDatabase().getSession()); } void @@ -174,15 +210,17 @@ PersistentState::setRebuildForType(LedgerEntryType let) return; } - updateDb(getStoreStateName(kRebuildLedger, let), "1"); + updateDb(getStoreStateName(kRebuildLedger, let), "1", + mApp.getDatabase().getSession()); } void -PersistentState::updateDb(std::string const& entry, std::string const& value) +PersistentState::updateDb(std::string const& entry, std::string const& value, + SessionWrapper& sess) { ZoneScoped; auto prep = mApp.getDatabase().getPreparedStatement( - "UPDATE storestate SET state = :v WHERE statename = :n;"); + "UPDATE storestate SET state = :v WHERE statename = :n;", sess); auto& st = prep.statement(); st.exchange(soci::use(value)); @@ -193,11 +231,11 @@ PersistentState::updateDb(std::string const& entry, std::string const& value) st.execute(true); } - if (st.get_affected_rows() != 1 && getFromDb(entry).empty()) + if (st.get_affected_rows() != 1 && getFromDb(entry, sess).empty()) { ZoneNamedN(insertStoreStateZone, "insert storestate", true); auto prep2 = mApp.getDatabase().getPreparedStatement( - "INSERT INTO storestate (statename, state) VALUES (:n, :v);"); + "INSERT INTO storestate (statename, state) VALUES (:n, :v);", sess); auto& st2 = prep2.statement(); st2.exchange(soci::use(entry)); st2.exchange(soci::use(value)); @@ -217,11 +255,11 @@ PersistentState::getTxSetsForAllSlots() std::vector result; std::string val; - std::string pattern = mapping[kTxSet] + "%"; + std::string pattern = miscMapping[kTxSet - kLastEntryMain - 1] + "%"; std::string statementStr = "SELECT state FROM storestate WHERE statename LIKE :n;"; auto& db = mApp.getDatabase(); - auto prep = db.getPreparedStatement(statementStr); + auto prep = db.getPreparedStatement(statementStr, db.getMiscSession()); auto& st = prep.statement(); st.exchange(soci::into(val)); st.exchange(soci::use(pattern)); @@ -247,11 +285,11 @@ PersistentState::getTxSetHashesForAllSlots() std::unordered_set result; std::string val; - std::string pattern = mapping[kTxSet] + "%"; + std::string pattern = miscMapping[kTxSet - kLastEntryMain - 1] + "%"; std::string statementStr = "SELECT statename FROM storestate WHERE statename LIKE :n;"; auto& db = mApp.getDatabase(); - auto prep = db.getPreparedStatement(statementStr); + auto prep = db.getPreparedStatement(statementStr, db.getMiscSession()); auto& st = prep.statement(); st.exchange(soci::into(val)); st.exchange(soci::use(pattern)); @@ -261,7 +299,7 @@ PersistentState::getTxSetHashesForAllSlots() st.execute(true); } - size_t offset = mapping[kTxSet].size(); + size_t offset = miscMapping[kTxSet - kLastEntryMain - 1].size(); Hash hash; size_t len = binToHex(hash).size(); @@ -275,14 +313,14 @@ PersistentState::getTxSetHashesForAllSlots() } std::string -PersistentState::getFromDb(std::string const& entry) +PersistentState::getFromDb(std::string const& entry, SessionWrapper& sess) { ZoneScoped; std::string res; auto& db = mApp.getDatabase(); auto prep = db.getPreparedStatement( - "SELECT state FROM storestate WHERE statename = :n;"); + "SELECT state FROM storestate WHERE statename = :n;", sess); auto& st = prep.statement(); st.exchange(soci::into(res)); st.exchange(soci::use(entry)); @@ -301,14 +339,14 @@ PersistentState::getFromDb(std::string const& entry) } bool -PersistentState::entryExists(std::string const& entry) +PersistentState::entryExists(std::string const& entry, SessionWrapper& session) { ZoneScoped; int res = 0; auto& db = mApp.getDatabase(); auto prep = db.getPreparedStatement( - "SELECT COUNT(*) FROM storestate WHERE statename = :n;"); + "SELECT COUNT(*) FROM storestate WHERE statename = :n;", session); auto& st = prep.statement(); st.exchange(soci::into(res)); st.exchange(soci::use(entry)); @@ -317,4 +355,5 @@ PersistentState::entryExists(std::string const& entry) return res > 0; } + } diff --git a/src/main/PersistentState.h b/src/main/PersistentState.h index c22cd59e57..ba06ecaeaf 100644 --- a/src/main/PersistentState.h +++ b/src/main/PersistentState.h @@ -19,23 +19,30 @@ class PersistentState enum Entry { - kLastClosedLedger = 0, + // Main database entries + kLastClosedLedger, kHistoryArchiveState, - kLastSCPData, kDatabaseSchema, kNetworkPassphrase, - kLedgerUpgrades, kRebuildLedger, + kDBBackend, + kLastEntryMain, // Marker for the end of main database entries + + // Misc database entries + kLastSCPData = kLastEntryMain + 1, + kLedgerUpgrades, kLastSCPDataXDR, kTxSet, - kDBBackend, - kLastEntry, + kMiscDatabaseSchema, + kLastEntry // Marker for the end of misc database entries }; static void dropAll(Database& db); + static void dropMisc(Database& db); - std::string getState(Entry stateName); - void setState(Entry stateName, std::string const& value); + std::string getState(Entry stateName, SessionWrapper& session); + void setState(Entry stateName, std::string const& value, + SessionWrapper& session); // Special methods for SCP state (multiple slots) std::vector getSCPStateAllSlots(); @@ -52,19 +59,23 @@ class PersistentState bool hasTxSet(Hash const& txSetHash); void deleteTxSets(std::unordered_set hashesToDelete); + std::string getStoreStateName(Entry n, uint32 subscript = 0) const; private: static std::string kSQLCreateStatement; - static std::string mapping[kLastEntry]; + static std::string mainMapping[kLastEntryMain]; + static std::string miscMapping[kLastEntry]; Application& mApp; - std::string getStoreStateName(Entry n, uint32 subscript = 0); std::string getStoreStateNameForTxSet(Hash const& txSetHash); void setSCPStateForSlot(uint64 slot, std::string const& value); - void updateDb(std::string const& entry, std::string const& value); - std::string getFromDb(std::string const& entry); - bool entryExists(std::string const& entry); + void updateDb(std::string const& entry, std::string const& value, + SessionWrapper& session); + + std::string getFromDb(std::string const& entry, SessionWrapper& session); + + bool entryExists(std::string const& entry, SessionWrapper& session); }; } diff --git a/src/overlay/BanManagerImpl.cpp b/src/overlay/BanManagerImpl.cpp index a433ef94e0..26938001e5 100644 --- a/src/overlay/BanManagerImpl.cpp +++ b/src/overlay/BanManagerImpl.cpp @@ -44,7 +44,8 @@ BanManagerImpl::banNode(NodeID nodeID) { ZoneNamedN(insertBanZone, "insert ban", true); auto prep = mApp.getDatabase().getPreparedStatement( - "INSERT INTO ban (nodeid) VALUES(:n)"); + "INSERT INTO ban (nodeid) VALUES(:n)", + mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(soci::use(nodeIDString)); st.define_and_bind(); @@ -61,7 +62,8 @@ BanManagerImpl::unbanNode(NodeID nodeID) { ZoneNamedN(deleteBanZone, "delete ban", true); auto prep = mApp.getDatabase().getPreparedStatement( - "DELETE FROM ban WHERE nodeid = :n;"); + "DELETE FROM ban WHERE nodeid = :n;", + mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(soci::use(nodeIDString)); st.define_and_bind(); @@ -77,7 +79,8 @@ BanManagerImpl::isBanned(NodeID nodeID) { ZoneNamedN(selectBanZone, "select ban", true); auto prep = mApp.getDatabase().getPreparedStatement( - "SELECT count(*) FROM ban WHERE nodeid = :n"); + "SELECT count(*) FROM ban WHERE nodeid = :n", + mApp.getDatabase().getMiscSession()); uint32_t count; auto& st = prep.statement(); st.exchange(soci::into(count)); @@ -96,8 +99,8 @@ BanManagerImpl::getBans() std::string nodeIDString; { ZoneNamedN(selectBanZone, "select ban", true); - auto prep = - mApp.getDatabase().getPreparedStatement("SELECT nodeid FROM ban"); + auto prep = mApp.getDatabase().getPreparedStatement( + "SELECT nodeid FROM ban", mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(soci::into(nodeIDString)); st.define_and_bind(); @@ -114,10 +117,10 @@ BanManagerImpl::getBans() void BanManager::dropAll(Database& db) { - db.getSession() << "DROP TABLE IF EXISTS ban"; + db.getRawMiscSession() << "DROP TABLE IF EXISTS ban"; - db.getSession() << "CREATE TABLE ban (" - "nodeid CHARACTER(56) NOT NULL PRIMARY KEY" - ")"; + db.getRawMiscSession() << "CREATE TABLE ban (" + "nodeid CHARACTER(56) NOT NULL PRIMARY KEY" + ")"; } } diff --git a/src/overlay/PeerManager.cpp b/src/overlay/PeerManager.cpp index 4abfbc946c..7f02772f5a 100644 --- a/src/overlay/PeerManager.cpp +++ b/src/overlay/PeerManager.cpp @@ -96,7 +96,7 @@ PeerManager::loadRandomPeers(PeerQuery const& query, size_t size) size = std::max(size, BATCH_SIZE); // if we ever start removing peers from db, we may need to enable this - // soci::transaction sqltx(mApp.getDatabase().getSession()); + // soci::transaction sqltx(mApp.getDatabase().getMiscSession()); // mApp.getDatabase().setCurrentTransactionReadOnly(); std::vector conditions; @@ -180,7 +180,8 @@ PeerManager::removePeersWithManyFailures(size_t minNumFailures, sql += " AND ip = :ip"; } - auto prep = db.getPreparedStatement(sql); + auto prep = + db.getPreparedStatement(sql, mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(use(minNumFailures)); @@ -237,7 +238,8 @@ PeerManager::load(PeerBareAddress const& address) { auto prep = mApp.getDatabase().getPreparedStatement( "SELECT numfailures, nextattempt, type FROM peers " - "WHERE ip = :v1 AND port = :v2"); + "WHERE ip = :v1 AND port = :v2", + mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(into(result.mNumFailures)); st.exchange(into(result.mNextAttempt)); @@ -294,7 +296,8 @@ PeerManager::store(PeerBareAddress const& address, PeerRecord const& peerRecord, try { - auto prep = mApp.getDatabase().getPreparedStatement(query); + auto prep = mApp.getDatabase().getPreparedStatement( + query, mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(use(peerRecord.mNextAttempt)); st.exchange(use(peerRecord.mNumFailures)); @@ -503,7 +506,8 @@ PeerManager::countPeers(std::string const& where, { std::string sql = "SELECT COUNT(*) FROM peers WHERE " + where; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement( + sql, mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); bind(st); @@ -533,7 +537,8 @@ PeerManager::loadPeers(size_t limit, size_t offset, std::string const& where, "FROM peers WHERE " + where + " LIMIT :limit OFFSET :offset"; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement( + sql, mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); bind(st); @@ -570,8 +575,8 @@ PeerManager::loadPeers(size_t limit, size_t offset, std::string const& where, void PeerManager::dropAll(Database& db) { - db.getSession() << "DROP TABLE IF EXISTS peers;"; - db.getSession() << kSQLCreateStatement; + db.getRawMiscSession() << "DROP TABLE IF EXISTS peers;"; + db.getRawMiscSession() << kSQLCreateStatement; } std::vector> @@ -588,7 +593,8 @@ PeerManager::loadAllPeers() int port; PeerRecord record; - auto prep = mApp.getDatabase().getPreparedStatement(sql); + auto prep = mApp.getDatabase().getPreparedStatement( + sql, mApp.getDatabase().getMiscSession()); auto& st = prep.statement(); st.exchange(into(ip)); @@ -621,7 +627,7 @@ void PeerManager::storePeers( std::vector> peers) { - soci::transaction tx(mApp.getDatabase().getSession()); + soci::transaction tx(mApp.getDatabase().getRawMiscSession()); for (auto const& peer : peers) { store(peer.first, peer.second, /* inDatabase */ false); diff --git a/src/overlay/test/OverlayManagerTests.cpp b/src/overlay/test/OverlayManagerTests.cpp index 5255100fba..afba967e8b 100644 --- a/src/overlay/test/OverlayManagerTests.cpp +++ b/src/overlay/test/OverlayManagerTests.cpp @@ -165,7 +165,7 @@ class OverlayManagerTests pm.storeConfigPeers(); } - rowset rs = app->getDatabase().getSession().prepare + rowset rs = app->getDatabase().getRawSession().prepare << "SELECT ip,port,type FROM peers ORDER BY ip, port"; auto& ppeers = pm.mConfigurationPreferredPeers; @@ -213,7 +213,7 @@ class OverlayManagerTests pm.mResolvedPeers.wait(); pm.tick(); - rowset rs = app->getDatabase().getSession().prepare + rowset rs = app->getDatabase().getRawSession().prepare << "SELECT ip,port,type FROM peers ORDER BY ip, port"; int found = 0; diff --git a/src/transactions/TransactionSQL.cpp b/src/transactions/TransactionSQL.cpp index c35a725f36..29c9b34a3d 100644 --- a/src/transactions/TransactionSQL.cpp +++ b/src/transactions/TransactionSQL.cpp @@ -348,20 +348,21 @@ void dropSupportTransactionFeeHistory(Database& db) { ZoneScoped; - db.getSession() << "DROP TABLE IF EXISTS txfeehistory"; + db.getRawSession() << "DROP TABLE IF EXISTS txfeehistory"; } void dropSupportTxSetHistory(Database& db) { ZoneScoped; - db.getSession() << "DROP TABLE IF EXISTS txsethistory"; + db.getRawSession() << "DROP TABLE IF EXISTS txsethistory"; } void dropSupportTxHistory(Database& db) { + releaseAssert(threadIsMain()); ZoneScoped; - db.getSession() << "DROP TABLE IF EXISTS txhistory"; + db.getRawSession() << "DROP TABLE IF EXISTS txhistory"; } }