diff --git a/src/common/bustub_instance.cpp b/src/common/bustub_instance.cpp index 5946d76dc..58697ccc3 100644 --- a/src/common/bustub_instance.cpp +++ b/src/common/bustub_instance.cpp @@ -214,11 +214,9 @@ auto BustubInstance::ExecuteSql(const std::string &sql, ResultWriter &writer, try { auto result = ExecuteSqlTxn(sql, writer, txn, std::move(check_options)); txn_manager_->Commit(txn); - delete txn; return result; } catch (bustub::Exception &ex) { txn_manager_->Abort(txn); - delete txn; throw ex; } } @@ -347,7 +345,6 @@ void BustubInstance::GenerateTestTable() { l.unlock(); txn_manager_->Commit(txn); - delete txn; } /** @@ -366,7 +363,6 @@ void BustubInstance::GenerateMockTable() { l.unlock(); txn_manager_->Commit(txn); - delete txn; } BustubInstance::~BustubInstance() { diff --git a/src/common/config.cpp b/src/common/config.cpp index 7405670c2..146f8f7f6 100644 --- a/src/common/config.cpp +++ b/src/common/config.cpp @@ -20,4 +20,6 @@ std::chrono::duration log_timeout = std::chrono::seconds(1); std::chrono::milliseconds cycle_detection_interval = std::chrono::milliseconds(50); +std::atomic global_disable_execution_exception_print{false}; + } // namespace bustub diff --git a/src/concurrency/CMakeLists.txt b/src/concurrency/CMakeLists.txt index 8974845a7..41d41b9e7 100644 --- a/src/concurrency/CMakeLists.txt +++ b/src/concurrency/CMakeLists.txt @@ -1,9 +1,10 @@ add_library( bustub_concurrency OBJECT - lock_manager.cpp - transaction_manager.cpp) + transaction_manager.cpp + transaction_manager_impl.cpp + watermark.cpp) set(ALL_OBJECT_FILES - ${ALL_OBJECT_FILES} $ - PARENT_SCOPE) + ${ALL_OBJECT_FILES} $ + PARENT_SCOPE) diff --git a/src/concurrency/transaction_manager.cpp b/src/concurrency/transaction_manager.cpp index b1b06d057..27f4c91b0 100644 --- a/src/concurrency/transaction_manager.cpp +++ b/src/concurrency/transaction_manager.cpp @@ -12,34 +12,53 @@ #include "concurrency/transaction_manager.h" +#include #include // NOLINT +#include #include #include #include #include "catalog/catalog.h" +#include "catalog/column.h" +#include "catalog/schema.h" +#include "common/config.h" +#include "common/exception.h" #include "common/macros.h" +#include "concurrency/transaction.h" +#include "execution/execution_common.h" #include "storage/table/table_heap.h" +#include "storage/table/tuple.h" +#include "type/type_id.h" +#include "type/value.h" +#include "type/value_factory.h" namespace bustub { -void TransactionManager::Commit(Transaction *txn) { - // Release all the locks. - ReleaseLocks(txn); +auto TransactionManager::Begin(IsolationLevel isolation_level) -> Transaction * { + std::unique_lock l(txn_map_mutex_); + auto txn_id = next_txn_id_++; + auto txn = std::make_unique(txn_id, isolation_level); + auto *txn_ref = txn.get(); + txn_map_.insert(std::make_pair(txn_id, std::move(txn))); - txn->SetState(TransactionState::COMMITTED); -} - -void TransactionManager::Abort(Transaction *txn) { - /* TODO: revert all the changes in write set */ + // TODO(fall2023): set the timestamps and compute watermark. - ReleaseLocks(txn); + return txn_ref; +} - txn->SetState(TransactionState::ABORTED); +auto TransactionManager::Commit(Transaction *txn) -> bool { + std::lock_guard commit_lck(commit_mutex_); + // TODO(fall2023): Implement me! + txn->state_ = TransactionState::COMMITTED; + return true; } -void TransactionManager::BlockAllTransactions() { UNIMPLEMENTED("block is not supported now!"); } +void TransactionManager::Abort(Transaction *txn) { + // TODO(fall2023): Implement me! + txn->state_ = TransactionState::ABORTED; +} -void TransactionManager::ResumeTransactions() { UNIMPLEMENTED("resume is not supported now!"); } +void TransactionManager::GarbageCollection() { UNIMPLEMENTED("not implemented"); } } // namespace bustub diff --git a/src/concurrency/transaction_manager_impl.cpp b/src/concurrency/transaction_manager_impl.cpp new file mode 100644 index 000000000..28dd044ab --- /dev/null +++ b/src/concurrency/transaction_manager_impl.cpp @@ -0,0 +1,112 @@ +// DO NOT CHANGE THIS FILE, this file will not be included in the autograder. + +#include +#include +#include // NOLINT +#include +#include +#include +#include + +#include "catalog/catalog.h" +#include "catalog/column.h" +#include "catalog/schema.h" +#include "common/config.h" +#include "common/exception.h" +#include "common/macros.h" +#include "concurrency/transaction.h" +#include "concurrency/transaction_manager.h" +#include "execution/execution_common.h" +#include "storage/table/table_heap.h" +#include "storage/table/tuple.h" +#include "type/type_id.h" +#include "type/value.h" +#include "type/value_factory.h" + +namespace bustub { + +auto TransactionManager::UpdateVersionLink(RID rid, std::optional prev_version, + std::function)> &&check) -> bool { + std::unique_lock lck(version_info_mutex_); + std::shared_ptr pg_ver_info = nullptr; + auto iter = version_info_.find(rid.GetPageId()); + if (iter == version_info_.end()) { + pg_ver_info = std::make_shared(); + version_info_[rid.GetPageId()] = pg_ver_info; + } else { + pg_ver_info = iter->second; + } + std::unique_lock lck2(pg_ver_info->mutex_); + lck.unlock(); + auto iter2 = pg_ver_info->prev_version_.find(rid.GetSlotNum()); + if (iter2 == pg_ver_info->prev_version_.end()) { + if (check != nullptr && !check(std::nullopt)) { + return false; + } + } else { + if (check != nullptr && !check(iter2->second)) { + return false; + } + } + if (prev_version.has_value()) { + pg_ver_info->prev_version_[rid.GetSlotNum()] = *prev_version; + } else { + pg_ver_info->prev_version_.erase(rid.GetSlotNum()); + } + return true; +} + +auto TransactionManager::GetVersionLink(RID rid) -> std::optional { + std::shared_lock lck(version_info_mutex_); + auto iter = version_info_.find(rid.GetPageId()); + if (iter == version_info_.end()) { + return std::nullopt; + } + std::shared_ptr pg_ver_info = iter->second; + std::unique_lock lck2(pg_ver_info->mutex_); + lck.unlock(); + auto iter2 = pg_ver_info->prev_version_.find(rid.GetSlotNum()); + if (iter2 == pg_ver_info->prev_version_.end()) { + return std::nullopt; + } + return std::make_optional(iter2->second); +} + +auto TransactionManager::GetUndoLink(RID rid) -> std::optional { + auto version_link = GetVersionLink(rid); + if (version_link.has_value()) { + return version_link->prev_; + } + return std::nullopt; +} + +auto TransactionManager::GetUndoLogOptional(UndoLink link) -> std::optional { + std::shared_lock lck(txn_map_mutex_); + auto iter = txn_map_.find(link.prev_txn_); + if (iter == txn_map_.end()) { + return std::nullopt; + } + auto txn = iter->second; + lck.unlock(); + return txn->GetUndoLog(link.prev_log_idx_); +} + +auto TransactionManager::GetUndoLog(UndoLink link) -> UndoLog { + auto undo_log = GetUndoLogOptional(link); + if (undo_log.has_value()) { + return *undo_log; + } + throw Exception("undo log not exist"); +} + +void Transaction::SetTainted() { + auto state = state_.load(); + if (state == TransactionState::RUNNING) { + state_.store(TransactionState::TAINTED); + return; + } + fmt::println(stderr, "transaction not in running state: {}", state); + std::terminate(); +} + +} // namespace bustub diff --git a/src/concurrency/watermark.cpp b/src/concurrency/watermark.cpp new file mode 100644 index 000000000..c8a035d63 --- /dev/null +++ b/src/concurrency/watermark.cpp @@ -0,0 +1,11 @@ +#include "concurrency/watermark.h" +#include +#include "common/exception.h" + +namespace bustub { + +auto Watermark::AddTxn(timestamp_t read_ts) -> void { throw NotImplementedException("unimplemented"); } + +auto Watermark::RemoveTxn(timestamp_t read_ts) -> void { throw NotImplementedException("unimplemented"); } + +} // namespace bustub diff --git a/src/execution/CMakeLists.txt b/src/execution/CMakeLists.txt index 6704b749b..cab89f5c0 100644 --- a/src/execution/CMakeLists.txt +++ b/src/execution/CMakeLists.txt @@ -3,6 +3,7 @@ add_library( OBJECT aggregation_executor.cpp delete_executor.cpp + execution_common.cpp executor_factory.cpp filter_executor.cpp fmt_impl.cpp diff --git a/src/execution/execution_common.cpp b/src/execution/execution_common.cpp new file mode 100644 index 000000000..6cdb99b2f --- /dev/null +++ b/src/execution/execution_common.cpp @@ -0,0 +1,25 @@ +#include "execution/execution_common.h" +#include "catalog/catalog.h" +#include "common/config.h" +#include "common/macros.h" +#include "concurrency/transaction_manager.h" +#include "fmt/core.h" +#include "storage/table/table_heap.h" +#include "type/value.h" +#include "type/value_factory.h" + +namespace bustub { + +auto ReconstructTuple(const Schema *schema, const Tuple &base_tuple, const TupleMeta &base_meta, + const std::vector &undo_logs) -> std::optional { + UNIMPLEMENTED("not implemented"); +} + +void TxnMgrDbg(const std::string &info, TransactionManager *txn_mgr, const TableInfo *table_info, + TableHeap *table_heap) { + // always use stderr for printing logs... + fmt::println(stderr, "debug_hook: {}", info); + // noop +} + +} // namespace bustub diff --git a/src/include/catalog/column.h b/src/include/catalog/column.h index a378f2e79..b645ca2a3 100644 --- a/src/include/catalog/column.h +++ b/src/include/catalog/column.h @@ -45,6 +45,7 @@ class Column { * @param column_name name of the column * @param type type of column * @param length length of the varlen + * @param expr expression used to create this column */ Column(std::string column_name, TypeId type, uint32_t length) : column_name_(std::move(column_name)), diff --git a/src/include/common/bustub_instance.h b/src/include/common/bustub_instance.h index 10cb47daa..c4886611a 100644 --- a/src/include/common/bustub_instance.h +++ b/src/include/common/bustub_instance.h @@ -113,6 +113,20 @@ class SimpleStreamWriter : public ResultWriter { std::string separator_; }; +class StringVectorWriter : public ResultWriter { + public: + void WriteCell(const std::string &cell) override { values_.back().push_back(cell); } + void WriteHeaderCell(const std::string &cell) override {} + void BeginHeader() override {} + void EndHeader() override {} + void BeginRow() override { values_.emplace_back(); } + void EndRow() override {} + void BeginTable(bool simplified_output) override {} + void EndTable() override {} + + std::vector> values_; +}; + class HtmlWriter : public ResultWriter { auto Escape(const std::string &data) -> std::string { std::string buffer; diff --git a/src/include/common/config.h b/src/include/common/config.h index 8688cd4ea..fb004eba1 100644 --- a/src/include/common/config.h +++ b/src/include/common/config.h @@ -16,6 +16,9 @@ #include // NOLINT #include +#define DISABLE_LOCK_MANAGER +#define DISABLE_CHECKPOINT_MANAGER + namespace bustub { /** Cycle detection is performed every CYCLE_DETECTION_INTERVAL milliseconds. */ diff --git a/src/include/common/exception.h b/src/include/common/exception.h index 2e671de4b..3ac1f3171 100644 --- a/src/include/common/exception.h +++ b/src/include/common/exception.h @@ -12,6 +12,7 @@ #pragma once +#include #include #include #include @@ -51,6 +52,8 @@ enum class ExceptionType { EXECUTION = 12, }; +extern std::atomic global_disable_execution_exception_print; + class Exception : public std::runtime_error { public: /** @@ -75,9 +78,9 @@ class Exception : public std::runtime_error { Exception(ExceptionType exception_type, const std::string &message, bool print = true) : std::runtime_error(message), type_(exception_type) { #ifndef NDEBUG - if (print) { + if (print && !global_disable_execution_exception_print.load()) { std::string exception_message = - "\nException Type :: " + ExceptionTypeToString(type_) + "\nMessage :: " + message + "\n"; + "\nException Type :: " + ExceptionTypeToString(type_) + ", Message :: " + message + "\n\n"; std::cerr << exception_message; } #endif @@ -109,6 +112,8 @@ class Exception : public std::runtime_error { return "Out of Memory"; case ExceptionType::NOT_IMPLEMENTED: return "Not implemented"; + case ExceptionType::EXECUTION: + return "Execution"; default: return "Unknown"; } @@ -127,7 +132,7 @@ class NotImplementedException : public Exception { class ExecutionException : public Exception { public: ExecutionException() = delete; - explicit ExecutionException(const std::string &msg) : Exception(ExceptionType::EXECUTION, msg, false) {} + explicit ExecutionException(const std::string &msg) : Exception(ExceptionType::EXECUTION, msg, true) {} }; } // namespace bustub diff --git a/src/include/concurrency/lock_manager.h b/src/include/concurrency/lock_manager.h index 52b25a905..aba2c2db5 100644 --- a/src/include/concurrency/lock_manager.h +++ b/src/include/concurrency/lock_manager.h @@ -85,8 +85,8 @@ class LockManager { cycle_detection_thread_ = new std::thread(&LockManager::RunCycleDetection, this); } - ~LockManager() { #ifndef DISABLE_LOCK_MANAGER + ~LockManager() { UnlockAll(); enable_cycle_detection_ = false; @@ -95,8 +95,10 @@ class LockManager { cycle_detection_thread_->join(); delete cycle_detection_thread_; } -#endif } +#else + ~LockManager() = default; +#endif /** * [LOCK_NOTE] diff --git a/src/include/concurrency/transaction.h b/src/include/concurrency/transaction.h index ab7e81a1c..77588273d 100644 --- a/src/include/concurrency/transaction.h +++ b/src/include/concurrency/transaction.h @@ -14,168 +14,74 @@ #include #include +#include +#include #include +#include +#include #include +#include // NOLINT #include #include // NOLINT #include #include +#include +#include #include "common/config.h" #include "common/logger.h" +#include "execution/expressions/abstract_expression.h" #include "storage/page/page.h" #include "storage/table/tuple.h" namespace bustub { -/** - * Transaction states for 2PL: - * Running transactions could be aborted during either `GROWING` or `SHRINKING` stage. - * - * _________________________ - * | | - * | v - * GROWING -> SHRINKING -> COMMITTED ABORTED - * | | ^ - * |___________|________________________| - * - * Transaction states for Non-2PL: - * Running transactions could only be aborted during `GROWING` stage, since there is no `SHRINKING` stage. - * - * __________ - * | | - * | v - * GROWING -> COMMITTED ABORTED - * | ^ - * |_________________________| - * - */ -enum class TransactionState { GROWING, SHRINKING, COMMITTED, ABORTED }; +class TransactionManager; /** - * Transaction isolation level. + * Transaction State. */ -enum class IsolationLevel { READ_UNCOMMITTED, REPEATABLE_READ, READ_COMMITTED }; +enum class TransactionState { RUNNING = 0, TAINTED, COMMITTED = 100, ABORTED }; /** - * Type of write operation. + * Transaction isolation level. `READ_UNCOMMITTED` will be used throughout project 3 as the default isolation level. + * In project 4, if a txn is in `READ_UNCOMMITTED` mode, it CAN ONLY be read-only. */ -enum class WType { INSERT = 0, DELETE, UPDATE }; +enum class IsolationLevel { READ_UNCOMMITTED, SNAPSHOT_ISOLATION, SERIALIZABLE }; class TableHeap; class Catalog; using table_oid_t = uint32_t; using index_oid_t = uint32_t; -/** - * WriteRecord tracks information related to a write. - */ -class TableWriteRecord { - public: - // NOLINTNEXTLINE - TableWriteRecord(table_oid_t tid, RID rid, TableHeap *table_heap) : tid_(tid), rid_(rid), table_heap_(table_heap) {} - - table_oid_t tid_; - RID rid_; - TableHeap *table_heap_; +/** Represents a link to a previous version of this tuple */ +struct UndoLink { + /* Previous version can be found in which txn */ + txn_id_t prev_txn_{INVALID_TXN_ID}; + /* The log index of the previous version in `prev_txn_` */ + int prev_log_idx_{0}; - // Recording write type might be useful if you want to implement in-place update for leaderboard - // optimization. You don't need it for the basic implementation. - WType wtype_; -}; - -/** - * WriteRecord tracks information related to a write. - */ -class IndexWriteRecord { - public: - // NOLINTNEXTLINE - IndexWriteRecord(RID rid, table_oid_t table_oid, WType wtype, const Tuple &tuple, index_oid_t index_oid, - Catalog *catalog) - : rid_(rid), table_oid_(table_oid), wtype_(wtype), tuple_(tuple), index_oid_(index_oid), catalog_(catalog) {} - - /** - * Note(spring2023): I don't know what are these for. If you are implementing leaderboard optimizations, you can - * figure out how to use this structure to store what you need. - */ + friend auto operator==(const UndoLink &a, const UndoLink &b) { + return a.prev_txn_ == b.prev_txn_ && a.prev_log_idx_ == b.prev_log_idx_; + } - /** The rid is the value stored in the index. */ - RID rid_; - /** Table oid. */ - table_oid_t table_oid_; - /** Write type. */ - WType wtype_; - /** The tuple is used to construct an index key. */ - Tuple tuple_; - /** The old tuple is only used for the update operation. */ - Tuple old_tuple_; - /** Each table has an index list, this is the identifier of an index into the list. */ - index_oid_t index_oid_; - /** The catalog contains metadata required to locate index. */ - Catalog *catalog_; -}; + friend auto operator!=(const UndoLink &a, const UndoLink &b) { return !(a == b); } -/** - * Reason to a transaction abortion - */ -enum class AbortReason { - LOCK_ON_SHRINKING, - UPGRADE_CONFLICT, - LOCK_SHARED_ON_READ_UNCOMMITTED, - TABLE_LOCK_NOT_PRESENT, - ATTEMPTED_INTENTION_LOCK_ON_ROW, - TABLE_UNLOCKED_BEFORE_UNLOCKING_ROWS, - INCOMPATIBLE_UPGRADE, - ATTEMPTED_UNLOCK_BUT_NO_LOCK_HELD + auto IsValid() const -> bool { return prev_txn_ != INVALID_TXN_ID; } }; -/** - * TransactionAbortException is thrown when state of a transaction is changed to ABORTED - */ -class TransactionAbortException : public std::exception { - public: - explicit TransactionAbortException(txn_id_t txn_id, AbortReason abort_reason) - : txn_id_(txn_id), abort_reason_(abort_reason) {} - - /** @return this transaction id */ - auto GetTransactionId() -> txn_id_t { return txn_id_; } - - /** @return the abort reason for this transaction */ - auto GetAbortReason() -> AbortReason { return abort_reason_; } - - /** @return the detailed information of abort reason */ - auto GetInfo() -> std::string { - switch (abort_reason_) { - case AbortReason::LOCK_ON_SHRINKING: - return "Transaction " + std::to_string(txn_id_) + - " aborted because it can not take locks in the shrinking state\n"; - case AbortReason::UPGRADE_CONFLICT: - return "Transaction " + std::to_string(txn_id_) + - " aborted because another transaction is already waiting to upgrade its lock\n"; - case AbortReason::LOCK_SHARED_ON_READ_UNCOMMITTED: - return "Transaction " + std::to_string(txn_id_) + " aborted on lockshared on READ_UNCOMMITTED\n"; - case AbortReason::TABLE_LOCK_NOT_PRESENT: - return "Transaction " + std::to_string(txn_id_) + " aborted because table lock not present\n"; - case AbortReason::ATTEMPTED_INTENTION_LOCK_ON_ROW: - return "Transaction " + std::to_string(txn_id_) + " aborted because intention lock attempted on row\n"; - case AbortReason::TABLE_UNLOCKED_BEFORE_UNLOCKING_ROWS: - return "Transaction " + std::to_string(txn_id_) + - " aborted because table locks dropped before dropping row locks\n"; - case AbortReason::INCOMPATIBLE_UPGRADE: - return "Transaction " + std::to_string(txn_id_) + " aborted because attempted lock upgrade is incompatible\n"; - case AbortReason::ATTEMPTED_UNLOCK_BUT_NO_LOCK_HELD: - return "Transaction " + std::to_string(txn_id_) + " aborted because attempted to unlock but no lock held \n"; - default: - // Unknown AbortReason - throw bustub::Exception("Unknown abort reason for transaction " + std::to_string(txn_id_)); - } - // This is impossible - assert(false); - } - - private: - txn_id_t txn_id_; - AbortReason abort_reason_; +/* Once the undo log is added to the txn, it becomes read-only and should NOT be changed except prev_version_. */ +struct UndoLog { + /* Whether this log is a deletion marker */ + bool is_deleted_; + /* The fields modified by this redo log */ + std::vector modified_fields_; + /* The modified fields */ + Tuple tuple_; + /* Timestamp of this undo log */ + timestamp_t ts_{INVALID_TS}; + /* Undo log prev version */ + UndoLink prev_version_{}; }; /** @@ -183,24 +89,8 @@ class TransactionAbortException : public std::exception { */ class Transaction { public: - explicit Transaction(txn_id_t txn_id, IsolationLevel isolation_level = IsolationLevel::REPEATABLE_READ) - : isolation_level_(isolation_level), - thread_id_(std::this_thread::get_id()), - txn_id_(txn_id), - prev_lsn_(INVALID_LSN), - s_table_lock_set_{new std::unordered_set}, - x_table_lock_set_{new std::unordered_set}, - is_table_lock_set_{new std::unordered_set}, - ix_table_lock_set_{new std::unordered_set}, - six_table_lock_set_{new std::unordered_set}, - s_row_lock_set_{new std::unordered_map>}, - x_row_lock_set_{new std::unordered_map>} { - // Initialize the sets that will be tracked. - table_write_set_ = std::make_shared>(); - index_write_set_ = std::make_shared>(); - page_set_ = std::make_shared>(); - deleted_page_set_ = std::make_shared>(); - } + explicit Transaction(txn_id_t txn_id, IsolationLevel isolation_level = IsolationLevel::SNAPSHOT_ISOLATION) + : isolation_level_(isolation_level), thread_id_(std::this_thread::get_id()), txn_id_(txn_id) {} ~Transaction() = default; @@ -212,182 +102,97 @@ class Transaction { /** @return the id of this transaction */ inline auto GetTransactionId() const -> txn_id_t { return txn_id_; } - /** @return the isolation level of this transaction */ - inline auto GetIsolationLevel() const -> IsolationLevel { return isolation_level_; } - - /** @return the list of table write records of this transaction */ - inline auto GetWriteSet() -> std::shared_ptr> { return table_write_set_; } + /** @return the id of this transaction, stripping the highest bit. NEVER use/store this value unless for debugging. */ + inline auto GetTransactionIdHumanReadable() const -> txn_id_t { return txn_id_ ^ TXN_START_ID; } - /** @return the list of index write records of this transaction */ - inline auto GetIndexWriteSet() -> std::shared_ptr> { return index_write_set_; } + /** @return the temporary timestamp of this transaction */ + inline auto GetTransactionTempTs() const -> timestamp_t { return txn_id_; } - /** @return the page set */ - inline auto GetPageSet() -> std::shared_ptr> { return page_set_; } - - /** - * Adds a tuple write record into the table write set. - * @param write_record write record to be added - */ - inline void AppendTableWriteRecord(const TableWriteRecord &write_record) { - table_write_set_->push_back(write_record); - } - - /** - * Adds an index write record into the index write set. - * @param write_record write record to be added - */ - inline void AppendIndexWriteRecord(const IndexWriteRecord &write_record) { - index_write_set_->push_back(write_record); - } - - /** - * Adds a page into the page set. - * @param page page to be added - */ - inline void AddIntoPageSet(Page *page) { page_set_->push_back(page); } - - /** @return the deleted page set */ - inline auto GetDeletedPageSet() -> std::shared_ptr> { return deleted_page_set_; } - - /** - * Adds a page to the deleted page set. - * @param page_id id of the page to be marked as deleted - */ - inline void AddIntoDeletedPageSet(page_id_t page_id) { deleted_page_set_->insert(page_id); } - - /** @return the set of rows under a shared lock */ - inline auto GetSharedRowLockSet() -> std::shared_ptr>> { - return s_row_lock_set_; - } + /** @return the isolation level of this transaction */ + inline auto GetIsolationLevel() const -> IsolationLevel { return isolation_level_; } - /** @return the set of rows in under an exclusive lock */ - inline auto GetExclusiveRowLockSet() -> std::shared_ptr>> { - return x_row_lock_set_; - } + /** @return the transaction state */ + inline auto GetTransactionState() const -> TransactionState { return state_; } - /** @return the set of table resources under a shared lock */ - inline auto GetSharedTableLockSet() -> std::shared_ptr> { return s_table_lock_set_; } + /** @return the read ts */ + inline auto GetReadTs() const -> timestamp_t { return read_ts_; } - /** @return the set of table resources under a exclusive lock */ - inline auto GetExclusiveTableLockSet() -> std::shared_ptr> { - return x_table_lock_set_; - } + /** @return the commit ts */ + inline auto GetCommitTs() const -> timestamp_t { return commit_ts_; } - /** @return the set of table resources under a intention shared lock */ - inline auto GetIntentionSharedTableLockSet() -> std::shared_ptr> { - return is_table_lock_set_; + /** Modify an existing undo log. */ + inline auto ModifyUndoLog(int log_idx, UndoLog new_log) { + std::scoped_lock lck(latch_); + undo_logs_[log_idx] = std::move(new_log); } - /** @return the set of table resources under a intention exclusive lock */ - inline auto GetIntentionExclusiveTableLockSet() -> std::shared_ptr> { - return ix_table_lock_set_; + /** @return the index of the undo log in this transaction */ + inline auto AppendUndoLog(UndoLog log) -> UndoLink { + std::scoped_lock lck(latch_); + undo_logs_.emplace_back(std::move(log)); + return {txn_id_, static_cast(undo_logs_.size() - 1)}; } - /** @return the set of table resources under a shared intention exclusive lock */ - inline auto GetSharedIntentionExclusiveTableLockSet() -> std::shared_ptr> { - return six_table_lock_set_; + inline auto AppendWriteSet(table_oid_t t, RID rid) { + std::scoped_lock lck(latch_); + write_set_[t].insert(rid); } - /** @return true if the specified row (belong to table oid) is shared locked by this transaction */ - auto IsRowSharedLocked(const table_oid_t &oid, const RID &rid) -> bool { - auto row_lock_set = s_row_lock_set_->find(oid); - if (row_lock_set == s_row_lock_set_->end()) { - return false; - } - return row_lock_set->second.find(rid) != row_lock_set->second.end(); + inline auto AppendScanPredicate(table_oid_t t, const AbstractExpressionRef &predicate) { + std::scoped_lock lck(latch_); + scan_predicates_.emplace_back(predicate); } - /** @return true if the specified row (belong to table oid) is exclusive locked by this transaction */ - auto IsRowExclusiveLocked(const table_oid_t &oid, const RID &rid) -> bool { - auto row_lock_set = x_row_lock_set_->find(oid); - if (row_lock_set == x_row_lock_set_->end()) { - return false; - } - return row_lock_set->second.find(rid) != row_lock_set->second.end(); + inline auto GetUndoLog(size_t log_id) -> UndoLog { + std::scoped_lock lck(latch_); + return undo_logs_[log_id]; } - /** @return true if the table (specified by oid) is intention shared locked by this transaction */ - auto IsTableIntentionSharedLocked(const table_oid_t &oid) -> bool { - return is_table_lock_set_->find(oid) != is_table_lock_set_->end(); + inline auto GetUndoLogNum() -> size_t { + std::scoped_lock lck(latch_); + return undo_logs_.size(); } - /** @return true if the table (specified by oid) is shared locked by this transaction */ - auto IsTableSharedLocked(const table_oid_t &oid) -> bool { - return s_table_lock_set_->find(oid) != s_table_lock_set_->end(); - } + void SetTainted(); - /** @return true if the table (specified by oid) is intention exclusive locked by this transaction */ - auto IsTableIntentionExclusiveLocked(const table_oid_t &oid) -> bool { - return ix_table_lock_set_->find(oid) != ix_table_lock_set_->end(); - } + private: + friend class TransactionManager; - /** @return true if the table (specified by oid) is exclusive locked by this transaction */ - auto IsTableExclusiveLocked(const table_oid_t &oid) -> bool { - return x_table_lock_set_->find(oid) != x_table_lock_set_->end(); - } + // The below fields should be ONLY changed by txn manager (with the txn manager lock held). - /** @return true if the table (specified by oid) is shared intention exclusive locked by this transaction */ - auto IsTableSharedIntentionExclusiveLocked(const table_oid_t &oid) -> bool { - return six_table_lock_set_->find(oid) != six_table_lock_set_->end(); - } + /** The state of this transaction. */ + std::atomic state_{TransactionState::RUNNING}; - /** @return the current state of the transaction */ - inline auto GetState() -> TransactionState { return state_; } + /** The read ts */ + std::atomic read_ts_{0}; - inline auto LockTxn() -> void { latch_.lock(); } + /** The commit ts */ + std::atomic commit_ts_{INVALID_TS}; - inline auto UnlockTxn() -> void { latch_.unlock(); } + /** The latch for this transaction for accessing txn-level undo logs, protecting all fields below. */ + std::mutex latch_; /** - * Set the state of the transaction. - * @param state new state + * @brief Store undo logs. Other undo logs / table heap will store (txn_id, index) pairs, and therefore + * you should only append to this vector or update things in-place without removing anything. */ - inline void SetState(TransactionState state) { state_ = state; } + std::vector undo_logs_; - /** @return the previous LSN */ - inline auto GetPrevLSN() -> lsn_t { return prev_lsn_; } + /** stores the RID of write tuples */ + std::unordered_map> write_set_; + /** store all scan predicates */ + std::vector scan_predicates_; - /** - * Set the previous LSN. - * @param prev_lsn new previous lsn - */ - inline void SetPrevLSN(lsn_t prev_lsn) { prev_lsn_ = prev_lsn; } + // The below fields are set when a txn is created and will NEVER be changed. - private: - /** The current transaction state. */ - TransactionState state_{TransactionState::GROWING}; /** The isolation level of the transaction. */ - IsolationLevel isolation_level_; - /** The thread ID, used in single-threaded transactions. */ - std::thread::id thread_id_; - /** The ID of this transaction. */ - txn_id_t txn_id_; + const IsolationLevel isolation_level_; - /** The undo set of table tuples. */ - std::shared_ptr> table_write_set_; - /** The undo set of indexes. */ - std::shared_ptr> index_write_set_; - /** The LSN of the last record written by the transaction. */ - lsn_t prev_lsn_; - - /** The latch for this transaction */ - std::mutex latch_; + /** The thread ID which the txn starts from. */ + const std::thread::id thread_id_; - /** Concurrent index: the pages that were latched during index operation. */ - std::shared_ptr> page_set_; - /** Concurrent index: the page IDs that were deleted during index operation.*/ - std::shared_ptr> deleted_page_set_; - - /** LockManager: the set of table locks held by this transaction. */ - std::shared_ptr> s_table_lock_set_; - std::shared_ptr> x_table_lock_set_; - std::shared_ptr> is_table_lock_set_; - std::shared_ptr> ix_table_lock_set_; - std::shared_ptr> six_table_lock_set_; - - /** LockManager: the set of row locks held by this transaction. */ - std::shared_ptr>> s_row_lock_set_; - std::shared_ptr>> x_row_lock_set_; + /** The ID of this transaction. */ + const txn_id_t txn_id_; }; } // namespace bustub @@ -403,11 +208,36 @@ struct fmt::formatter : formatter { case IsolationLevel::READ_UNCOMMITTED: name = "READ_UNCOMMITTED"; break; - case IsolationLevel::READ_COMMITTED: - name = "READ_COMMITTED"; + case IsolationLevel::SNAPSHOT_ISOLATION: + name = "SNAPSHOT_ISOLATION"; + break; + case IsolationLevel::SERIALIZABLE: + name = "SERIALIZABLE"; + break; + } + return formatter::format(name, ctx); + } +}; + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto format(bustub::TransactionState x, FormatContext &ctx) const { + using bustub::TransactionState; + string_view name = "unknown"; + switch (x) { + case TransactionState::RUNNING: + name = "RUNNING"; + break; + case TransactionState::ABORTED: + name = "ABORTED"; + break; + case TransactionState::COMMITTED: + name = "COMMITTED"; break; - case IsolationLevel::REPEATABLE_READ: - name = "REPEATABLE_READ"; + case TransactionState::TAINTED: + name = "TAINTED"; break; } return formatter::format(name, ctx); diff --git a/src/include/concurrency/transaction_manager.h b/src/include/concurrency/transaction_manager.h index 55a909ff2..476e581dc 100644 --- a/src/include/concurrency/transaction_manager.h +++ b/src/include/concurrency/transaction_manager.h @@ -13,164 +13,120 @@ #pragma once #include +#include +#include +#include // NOLINT +#include #include #include #include -#include "catalog/catalog.h" +#include "catalog/schema.h" #include "common/config.h" -#include "concurrency/lock_manager.h" #include "concurrency/transaction.h" +#include "concurrency/watermark.h" #include "recovery/log_manager.h" - -// If this is defined, project 4 related logic will be enabled. -// #define BUSTUB_REFSOL_HACK_TXN_MANAGER_IMPLEMENTED +#include "storage/table/tuple.h" namespace bustub { -class LockManager; + +struct VersionUndoLink { + /** The next version in the version chain. */ + UndoLink prev_; + /** Whether a transaction is modifying the version link. Fall 2023: you do not need to read / write this field until + * task 4.2. */ + bool in_progress_{false}; + + friend auto operator==(const VersionUndoLink &a, const VersionUndoLink &b) { + return a.prev_ == b.prev_ && a.in_progress_ == b.in_progress_; + } + + friend auto operator!=(const VersionUndoLink &a, const VersionUndoLink &b) { return !(a == b); } + + inline static auto FromOptionalUndoLink(std::optional undo_link) -> std::optional { + if (undo_link.has_value()) { + return VersionUndoLink{*undo_link}; + } + return std::nullopt; + } +}; /** * TransactionManager keeps track of all the transactions running in the system. */ class TransactionManager { public: - explicit TransactionManager(LockManager *lock_manager, LogManager *log_manager = nullptr) - : lock_manager_(lock_manager), log_manager_(log_manager) {} - + TransactionManager() = default; ~TransactionManager() = default; /** * Begins a new transaction. - * @param txn an optional transaction object to be initialized, otherwise a new transaction is created. * @param isolation_level an optional isolation level of the transaction. * @return an initialized transaction */ - auto Begin(Transaction *txn = nullptr, IsolationLevel isolation_level = IsolationLevel::REPEATABLE_READ) - -> Transaction * { - if (txn == nullptr) { - txn = new Transaction(next_txn_id_++, isolation_level); - } - - if (enable_logging) { - LogRecord record = LogRecord(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::BEGIN); - lsn_t lsn = log_manager_->AppendLogRecord(&record); - txn->SetPrevLSN(lsn); - } - - std::unique_lock l(txn_map_mutex_); - txn_map_[txn->GetTransactionId()] = txn; - return txn; - } + auto Begin(IsolationLevel isolation_level = IsolationLevel::SNAPSHOT_ISOLATION) -> Transaction *; /** * Commits a transaction. - * @param txn the transaction to commit + * @param txn the transaction to commit, the txn will be managed by the txn manager so no need to delete it by + * yourself */ - void Commit(Transaction *txn); + auto Commit(Transaction *txn) -> bool; /** * Aborts a transaction - * @param txn the transaction to abort + * @param txn the transaction to abort, the txn will be managed by the txn manager so no need to delete it by yourself */ void Abort(Transaction *txn); - /** - * Global list of running transactions - */ + auto UpdateVersionLink(RID rid, std::optional prev_version, + std::function)> &&check = nullptr) -> bool; - /** The transaction map is a global list of all the running transactions in the system. */ - std::unordered_map txn_map_; - std::shared_mutex txn_map_mutex_; + /** The same as `GetVersionLink`, except that we extracted the undo link field out. */ + auto GetUndoLink(RID rid) -> std::optional; - /** - * Locates and returns the transaction with the given transaction ID. - * @param txn_id the id of the transaction to be found, it must exist! - * @return the transaction with the given transaction id - */ - auto GetTransaction(txn_id_t txn_id) -> Transaction * { - std::shared_lock l(txn_map_mutex_); - assert(txn_map_.find(txn_id) != txn_map_.end()); - auto *res = txn_map_[txn_id]; - assert(res != nullptr); - return res; - } + /** You only need this starting task 4.2 */ + auto GetVersionLink(RID rid) -> std::optional; - /** Prevents all transactions from performing operations, used for checkpointing. */ - void BlockAllTransactions(); + auto GetUndoLogOptional(UndoLink link) -> std::optional; - /** Resumes all transactions, used for checkpointing. */ - void ResumeTransactions(); + auto GetUndoLog(UndoLink link) -> UndoLog; - Catalog *catalog_; + auto GetWatermark() -> timestamp_t { return running_txns_.GetWatermark(); } - /** - * Set we're in terrier bench mode - */ - inline void SetTerrier() { terrier_ = true; } - - /** - * Get if we're in terrier bench mode - * @return boolean indicating terrier mode - */ - inline auto GetTerrier() -> bool { return terrier_; } - - private: - /** - * Releases all the locks held by the given transaction. - * @param txn the transaction whose locks should be released - */ - void ReleaseLocks(Transaction *txn) { - /** Drop all row locks */ - txn->LockTxn(); - std::unordered_map> row_lock_set; - for (const auto &s_row_lock_set : *txn->GetSharedRowLockSet()) { - for (auto rid : s_row_lock_set.second) { - row_lock_set[s_row_lock_set.first].emplace(rid); - } - } - for (const auto &x_row_lock_set : *txn->GetExclusiveRowLockSet()) { - for (auto rid : x_row_lock_set.second) { - row_lock_set[x_row_lock_set.first].emplace(rid); - } - } - - /** Drop all table locks */ - std::unordered_set table_lock_set; - for (auto oid : *txn->GetSharedTableLockSet()) { - table_lock_set.emplace(oid); - } - for (table_oid_t oid : *(txn->GetIntentionSharedTableLockSet())) { - table_lock_set.emplace(oid); - } - for (auto oid : *txn->GetExclusiveTableLockSet()) { - table_lock_set.emplace(oid); - } - for (auto oid : *txn->GetIntentionExclusiveTableLockSet()) { - table_lock_set.emplace(oid); - } - for (auto oid : *txn->GetSharedIntentionExclusiveTableLockSet()) { - table_lock_set.emplace(oid); - } - txn->UnlockTxn(); - - for (const auto &locked_table_row_set : row_lock_set) { - table_oid_t oid = locked_table_row_set.first; - for (auto rid : locked_table_row_set.second) { - lock_manager_->UnlockRow(txn, oid, rid); - } - } + void GarbageCollection(); - for (auto oid : table_lock_set) { - lock_manager_->UnlockTable(txn, oid); - } - } - - std::atomic next_txn_id_{0}; - LockManager *lock_manager_ __attribute__((__unused__)); - LogManager *log_manager_ __attribute__((__unused__)); + /** protects txn map */ + std::shared_mutex txn_map_mutex_; + /** All transactions, running or committed */ + std::unordered_map> txn_map_; + + struct PageVersionInfo { + /** protects the map */ + std::shared_mutex mutex_; + /** Stores previous version info for all slots. Note: DO NOT use `[x]` to access it because + * it will create new elements even if it does not exist. Use `find` instead. + */ + std::unordered_map prev_version_; + }; + + /** protects version info */ + std::shared_mutex version_info_mutex_; + /** Stores the previous version of each tuple in the table heap. */ + std::unordered_map> version_info_; + + /** Stores all the read_ts of running txns so as to facilitate garbage collection. */ + Watermark running_txns_{0}; + + /** Only one txn is allowed to commit at a time */ + std::mutex commit_mutex_; + /** The last committed timestamp. */ + std::atomic last_commit_ts_{0}; + + /** Catalog */ + Catalog *catalog_; - /** Terrier Bench Hack */ - bool terrier_{false}; + std::atomic next_txn_id_{TXN_START_ID}; }; } // namespace bustub diff --git a/src/include/concurrency/watermark.h b/src/include/concurrency/watermark.h new file mode 100644 index 000000000..c171cd668 --- /dev/null +++ b/src/include/concurrency/watermark.h @@ -0,0 +1,40 @@ +#pragma once + +#include + +#include "concurrency/transaction.h" +#include "storage/table/tuple.h" + +namespace bustub { + +/** + * @brief tracks all the read timestamps. + * + */ +class Watermark { + public: + explicit Watermark(timestamp_t commit_ts) : commit_ts_(commit_ts), watermark_(commit_ts) {} + + auto AddTxn(timestamp_t read_ts) -> void; + + auto RemoveTxn(timestamp_t read_ts) -> void; + + /** The caller should update commit ts before removing the txn from the watermark so that we can track watermark + * correctly. */ + auto UpdateCommitTs(timestamp_t commit_ts) { commit_ts_ = commit_ts; } + + auto GetWatermark() -> timestamp_t { + if (current_reads_.empty()) { + return commit_ts_; + } + return watermark_; + } + + timestamp_t commit_ts_; + + timestamp_t watermark_; + + std::unordered_map current_reads_; +}; + +}; // namespace bustub diff --git a/src/include/execution/execution_common.h b/src/include/execution/execution_common.h new file mode 100644 index 000000000..584f8afdb --- /dev/null +++ b/src/include/execution/execution_common.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include + +#include "catalog/catalog.h" +#include "catalog/schema.h" +#include "concurrency/transaction.h" +#include "storage/table/tuple.h" + +namespace bustub { + +auto ReconstructTuple(const Schema *schema, const Tuple &base_tuple, const TupleMeta &base_meta, + const std::vector &undo_logs) -> std::optional; + +void TxnMgrDbg(const std::string &info, TransactionManager *txn_mgr, const TableInfo *table_info, + TableHeap *table_heap); + +// Add new functions as needed... You are likely need to define some more functions. +// +// To give you a sense of what can be shared across executors / transaction manager, here are the +// list of helper function names that we defined in the reference solution. You should come up with +// your own when you go through the process. +// * CollectUndoLogs +// * WalkUndoLogs +// * Modify +// * IsWriteWriteConflict +// * GenerateDiffLog +// * GenerateNullTupleForSchema +// * GetUndoLogSchema +// +// We do not provide the signatures for these functions because it depends on the your implementation +// of other parts of the system. You do not need to define the same set of helper functions in +// your implementation. Please add your own ones as necessary so that you do not need to write +// the same code everywhere. + +} // namespace bustub diff --git a/src/include/storage/table/table_heap.h b/src/include/storage/table/table_heap.h index 2d85ca3d5..cfcbca822 100644 --- a/src/include/storage/table/table_heap.h +++ b/src/include/storage/table/table_heap.h @@ -93,7 +93,8 @@ class TableHeap { * @param[out] rid the rid of the tuple to be updated */ auto UpdateTupleInPlace(const TupleMeta &meta, const Tuple &tuple, RID rid, - std::function &&check) -> bool; + std::function &&check = nullptr) + -> bool; /** For binder tests */ static auto CreateEmptyHeap(bool create_table_heap = false) -> std::unique_ptr { @@ -103,15 +104,6 @@ class TableHeap { } private: - /** - * Insert a tuple into the table. If the tuple is too large (>= page_size), return std::nullopt. - * @param meta tuple meta - * @param tuple tuple to insert - * @return rid of the inserted tuple - */ - auto InsertTupleDeferredUnlock(const TupleMeta &meta, const Tuple &tuple) - -> std::pair>; - /** Used for binder tests */ explicit TableHeap(bool create_table_heap = false); diff --git a/src/include/storage/table/tuple.h b/src/include/storage/table/tuple.h index a0d634d3a..9305f9733 100644 --- a/src/include/storage/table/tuple.h +++ b/src/include/storage/table/tuple.h @@ -60,6 +60,8 @@ class Tuple { // constructor for table heap tuple explicit Tuple(RID rid) : rid_(rid) {} + static auto Empty() -> Tuple { return Tuple{RID{INVALID_PAGE_ID, 0}}; } + // constructor for creating a new tuple based on input value Tuple(std::vector values, const Schema *schema); diff --git a/src/include/type/value.h b/src/include/type/value.h index b539e822c..d4a991e4b 100644 --- a/src/include/type/value.h +++ b/src/include/type/value.h @@ -93,6 +93,13 @@ class Value { inline auto CastAs(const TypeId type_id) const -> Value { return Type::GetInstance(type_id_)->CastAs(*this, type_id); } + // You will likely need this in project 4... + inline auto CompareExactlyEquals(const Value &o) const -> bool { + if (this->IsNull() && o.IsNull()) { + return true; + } + return (Type::GetInstance(type_id_)->CompareEquals(*this, o)) == CmpBool::CmpTrue; + } // Comparison Methods inline auto CompareEquals(const Value &o) const -> CmpBool { return Type::GetInstance(type_id_)->CompareEquals(*this, o); diff --git a/src/recovery/CMakeLists.txt b/src/recovery/CMakeLists.txt index f952039e6..2454c91f9 100644 --- a/src/recovery/CMakeLists.txt +++ b/src/recovery/CMakeLists.txt @@ -1,7 +1,6 @@ add_library( bustub_recovery OBJECT - checkpoint_manager.cpp log_manager.cpp) set(ALL_OBJECT_FILES diff --git a/src/storage/table/table_heap.cpp b/src/storage/table/table_heap.cpp index cea49fd14..63491f6f5 100644 --- a/src/storage/table/table_heap.cpp +++ b/src/storage/table/table_heap.cpp @@ -38,48 +38,6 @@ TableHeap::TableHeap(BufferPoolManager *bpm) : bpm_(bpm) { TableHeap::TableHeap(bool create_table_heap) : bpm_(nullptr) {} -auto TableHeap::InsertTupleDeferredUnlock(const TupleMeta &meta, const Tuple &tuple) - -> std::pair> { - std::unique_lock guard(latch_); - auto page_guard = bpm_->FetchPageWrite(last_page_id_); - while (true) { - auto page = page_guard.AsMut(); - if (page->GetNextTupleOffset(meta, tuple) != std::nullopt) { - break; - } - - // if there's no tuple in the page, and we can't insert the tuple, then this tuple is too large. - BUSTUB_ENSURE(page->GetNumTuples() != 0, "tuple is too large, cannot insert"); - - page_id_t next_page_id = INVALID_PAGE_ID; - auto npg = bpm_->NewPage(&next_page_id); - BUSTUB_ENSURE(next_page_id != INVALID_PAGE_ID, "cannot allocate page"); - - page->SetNextPageId(next_page_id); - - auto next_page = reinterpret_cast(npg->GetData()); - next_page->Init(); - - page_guard.Drop(); - - // acquire latch here as TSAN complains. Given we only have one insertion thread, this is fine. - npg->WLatch(); - auto next_page_guard = WritePageGuard{bpm_, npg}; - - last_page_id_ = next_page_id; - page_guard = std::move(next_page_guard); - } - auto last_page_id = last_page_id_; - - auto page = page_guard.AsMut(); - auto slot_id = *page->InsertTuple(meta, tuple); - - // only allow one insertion at a time, otherwise it will deadlock. - guard.unlock(); - - return std::make_pair(std::move(page_guard), RID(last_page_id, slot_id)); -} - auto TableHeap::InsertTuple(const TupleMeta &meta, const Tuple &tuple, LockManager *lock_mgr, Transaction *txn, table_oid_t oid) -> std::optional { std::unique_lock guard(latch_); @@ -158,9 +116,7 @@ auto TableHeap::MakeIterator() -> TableIterator { auto page_guard = bpm_->FetchPageRead(last_page_id); auto page = page_guard.As(); - auto num_tuples = page->GetNumTuples(); - page_guard.Drop(); - return {this, {first_page_id_, 0}, {last_page_id, num_tuples}}; + return {this, {first_page_id_, 0}, {last_page_id, page->GetNumTuples()}}; } auto TableHeap::MakeEagerIterator() -> TableIterator { return {this, {first_page_id_, 0}, {INVALID_PAGE_ID, 0}}; } @@ -171,7 +127,7 @@ auto TableHeap::UpdateTupleInPlace(const TupleMeta &meta, const Tuple &tuple, RI auto page_guard = bpm_->FetchPageWrite(rid.GetPageId()); auto page = page_guard.AsMut(); auto [old_meta, old_tup] = page->GetTuple(rid); - if (check(old_meta, old_tup, rid)) { + if (check == nullptr || check(old_meta, old_tup, rid)) { page->UpdateTupleInPlaceUnsafe(meta, tuple, rid); return true; } diff --git a/test/concurrency/deadlock_detection_test.cpp b/test/concurrency/deadlock_detection_test.cpp.disabled similarity index 100% rename from test/concurrency/deadlock_detection_test.cpp rename to test/concurrency/deadlock_detection_test.cpp.disabled diff --git a/test/concurrency/lock_manager_test.cpp b/test/concurrency/lock_manager_test.cpp.disabled similarity index 100% rename from test/concurrency/lock_manager_test.cpp rename to test/concurrency/lock_manager_test.cpp.disabled diff --git a/test/concurrency/txn_integration_test.cpp b/test/concurrency/txn_integration_test.cpp.disabled similarity index 100% rename from test/concurrency/txn_integration_test.cpp rename to test/concurrency/txn_integration_test.cpp.disabled diff --git a/test/txn/txn_common.h b/test/txn/txn_common.h new file mode 100644 index 000000000..6058471ca --- /dev/null +++ b/test/txn/txn_common.h @@ -0,0 +1,413 @@ +#pragma once + +#include +#include // NOLINT +#include +#include +#include +#include // NOLINT +#include +#include +#include +#include +#include // NOLINT +#include + +#include "buffer/buffer_pool_manager.h" +#include "catalog/catalog.h" +#include "catalog/column.h" +#include "catalog/schema.h" +#include "common/bustub_instance.h" +#include "common/config.h" +#include "common/util/string_util.h" +#include "concurrency/transaction.h" +#include "concurrency/transaction_manager.h" +#include "concurrency/watermark.h" +#include "execution/execution_common.h" +#include "fmt/core.h" +#include "fmt/format.h" +#include "gtest/gtest.h" +#include "storage/disk/disk_manager_memory.h" +#include "storage/index/b_plus_tree.h" +#include "storage/table/tuple.h" +#include "test_util.h" // NOLINT +#include "type/type.h" +#include "type/type_id.h" +#include "type/value_factory.h" + +namespace bustub { + +auto Int(uint32_t data) -> Value { return ValueFactory::GetIntegerValue(data); } + +auto IntNull() -> Value { return ValueFactory::GetNullValueByType(TypeId::INTEGER); } + +auto Double(double data) -> Value { return ValueFactory::GetDecimalValue(data); } + +auto DoubleNull() -> Value { return ValueFactory::GetNullValueByType(TypeId::DECIMAL); } + +auto Bool(bool data) -> Value { return ValueFactory::GetBooleanValue(data); } + +auto BoolNull() -> Value { return ValueFactory::GetNullValueByType(TypeId::BOOLEAN); } + +auto CompareValue(const Value &v1, const Value &v2) -> bool { + return v1.CompareEquals(v2) == CmpBool::CmpTrue || (v1.IsNull() && v2.IsNull()); +} + +void VerifyTuple(const Schema *schema, const Tuple &tuple, const std::vector &values) { + for (uint32_t i = 0; i < schema->GetColumnCount(); i++) { + auto actual_val = tuple.GetValue(schema, i); + auto &expected_val = values[i]; + if (!CompareValue(actual_val, expected_val)) { + std::cerr << "tuple data mismatch: got " << tuple.ToString(schema) << "; at column " << i << ", actual value " + << actual_val.ToString() << " != expected value " << expected_val.ToString() << std::endl; + std::terminate(); + } + } +} + +template +auto ResultVecToString(const std::vector> &result) -> std::string { + std::vector rows; + for (const auto &row : result) { + if (row.empty()) { + rows.emplace_back(""); + } else { + rows.emplace_back(fmt::format("{}", fmt::join(row, ", "))); + } + } + if (result.empty()) { + rows.emplace_back(""); + } + return fmt::format("{}", fmt::join(rows, "\n")); +} + +template +auto ResultVecToVecString(const std::vector> &result) -> std::vector> { + std::vector> rows; + for (const auto &row : result) { + rows.emplace_back(); + for (const auto &col : row) { + rows.back().push_back(fmt::format("{}", col)); + } + } + return rows; +} + +template +auto VecToVecString(const std::vector &result) -> std::vector { + std::vector rows; + for (const auto &col : result) { + rows.emplace_back(fmt::format("{}", col)); + } + return rows; +} + +template +auto ExpectResult(const std::vector> &actual_result, + const std::vector> &expected_result, bool show_result, bool newline = true) -> bool { + auto actual_result_rows = ResultVecToVecString(actual_result); + auto expected_result_rows = ResultVecToVecString(expected_result); + std::sort(actual_result_rows.begin(), actual_result_rows.end()); + std::sort(expected_result_rows.begin(), expected_result_rows.end()); + if (actual_result_rows != expected_result_rows) { + auto expected_result_str = ResultVecToString(expected_result); + auto actual_result_str = ResultVecToString(actual_result); + if (show_result) { + if (!newline) { + fmt::println(stderr, "-- error :("); + } + fmt::print(stderr, "ERROR: result mismatch\n--- EXPECTED ---\n{}\n\n--- ACTUAL ---\n{}\n\n", expected_result_str, + actual_result_str); + } else { + if (!newline) { + fmt::println(stderr, "-- error :("); + } + fmt::print(stderr, "ERROR: result mismatch\n--- EXPECTED ---\n\n\n--- ACTUAL ---\n{}\n\n", + actual_result_str); + } + } else { + if (!newline) { + fmt::print(stderr, "-"); + } + fmt::print(stderr, "- pass check :)\n"); + } + return actual_result_rows == expected_result_rows; +} + +template +void Query(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn, const std::string &query, + const std::vector> &expected_rows, bool show_result) { + std::stringstream ss; + auto writer = bustub::StringVectorWriter(); + fmt::print(stderr, "- query var={} id={} status={} read_ts={} query=\"{}\" ", txn_var_name, + txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs(), query); + if (txn->GetTransactionState() != TransactionState::RUNNING) { + fmt::println(stderr, "txn not running"); + std::terminate(); + } + if (!instance.ExecuteSqlTxn(query, writer, txn)) { + std::cerr << "failed to execute sql" << std::endl; + std::terminate(); + } + if (!ExpectResult(writer.values_, expected_rows, show_result, false)) { + std::terminate(); + } +} + +template +void QueryHideResult(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn, + const std::string &query, const std::vector> &expected_rows) { + Query(instance, txn_var_name, txn, query, expected_rows, false); +} + +template +void QueryShowResult(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn, + const std::string &query, const std::vector> &expected_rows) { + Query(instance, txn_var_name, txn, query, expected_rows, true); +} + +void CheckUndoLogNum(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn, size_t expected_num) { + fmt::println(stderr, "- check_undo_log var={} id={} status={} read_ts={}", txn_var_name, + txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs()); + auto undo_log_num = txn->GetUndoLogNum(); + if (undo_log_num != expected_num) { + fmt::println(stderr, "Error: expected to have {} undo logs in txn, found {}", expected_num, undo_log_num); + std::terminate(); + } +} + +void CheckUndoLogColumn(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn, + size_t expected_columns) { + fmt::println(stderr, "- check_undo_log var={} id={} status={} read_ts={}", txn_var_name, + txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs()); + auto undo_log_num = txn->GetUndoLogNum(); + if (undo_log_num != 1) { + fmt::println(stderr, "Error: expected to have {} undo logs in txn, found {}", 1, undo_log_num); + std::terminate(); + } + auto undo_log = txn->GetUndoLog(0); + size_t cnt = + std::count_if(undo_log.modified_fields_.begin(), undo_log.modified_fields_.end(), [](const auto x) { return x; }); + if (cnt != expected_columns) { + fmt::println(stderr, "Error: expected undo log to have {} columns, found {} (modified_fields=[{}])", + expected_columns, cnt, fmt::join(undo_log.modified_fields_, ",")); + std::terminate(); + } +} + +#define WithTxn(txn, func) \ + { \ + [[maybe_unused]] const std::string &_var = (#txn); \ + [[maybe_unused]] Transaction *_txn = txn; \ + func; \ + } + +void Execute(BustubInstance &instance, const std::string &query) { + fmt::println(stderr, "- execute sql=\"{}\"", query); + NoopWriter writer; + if (!instance.ExecuteSql(query, writer)) { + std::cerr << "failed to execute sql" << std::endl; + std::terminate(); + } +} + +auto TableHeapEntry(BustubInstance &instance, TableInfo *table_info) -> size_t { + auto table_heap = table_info->table_.get(); + auto table_iter = table_heap->MakeEagerIterator(); + size_t cnt = 0; + while (!table_iter.IsEnd()) { + ++table_iter; + cnt++; + } + return cnt; +} + +void TableHeapEntryNoMoreThan(BustubInstance &instance, TableInfo *table_info, size_t upper_limit) { + fmt::print(stderr, "- verify table heap"); + auto cnt = TableHeapEntry(instance, table_info); + if (cnt > upper_limit) { + fmt::println(stderr, " -- error: expect table heap to contain at most {} elements, found {}", upper_limit, cnt); + std::terminate(); + } + fmt::println(stderr, "- verify table heap"); +} + +void ExecuteTxn(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn, const std::string &query) { + fmt::println(stderr, "- execute var={} id={} status={} read_ts={} sql=\"{}\"", txn_var_name, + txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs(), query); + if (txn->GetTransactionState() != TransactionState::RUNNING) { + fmt::println(stderr, "txn not running"); + std::terminate(); + } + NoopWriter writer; + if (!instance.ExecuteSqlTxn(query, writer, txn)) { + std::cerr << "failed to execute sql" << std::endl; + std::terminate(); + } +} + +auto BeginTxn(BustubInstance &instance, const std::string &txn_var_name) -> Transaction * { + auto txn = instance.txn_manager_->Begin(); + fmt::println(stderr, "- txn_begin var={} id={} status={} read_ts={}", txn_var_name, + txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs()); + return txn; +} + +auto CommitTxn(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn) { + if (txn->GetTransactionState() != TransactionState::RUNNING) { + fmt::println(stderr, "txn not running"); + std::terminate(); + } + if (!instance.txn_manager_->Commit(txn)) { + fmt::println(stderr, "failed to commit txn: var={} id={}", txn_var_name, txn->GetTransactionId()); + std::terminate(); + } + if (txn->GetTransactionState() != TransactionState::COMMITTED) { + fmt::println(stderr, "should set to committed state var={} id={}", txn_var_name, txn->GetTransactionId()); + std::terminate(); + } + fmt::println(stderr, "- txn_commit var={} id={} status={} read_ts={} commit_ts={}", txn_var_name, + txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs(), txn->GetCommitTs()); +} + +auto CheckTainted(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn) { + if (txn->GetTransactionState() != TransactionState::TAINTED) { + fmt::println(stderr, "should set to tainted state var={} id={}", txn_var_name, txn->GetTransactionId()); + std::terminate(); + } + fmt::println(stderr, "- txn_tainted var={} id={} status={} read_ts={}", txn_var_name, + txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs()); +} + +auto CommitTaintedTxn(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn) { + if (instance.txn_manager_->Commit(txn)) { + fmt::println(stderr, "should not commit var={} id={}", txn_var_name, txn->GetTransactionId()); + std::terminate(); + } + if (txn->GetTransactionState() != TransactionState::TAINTED) { + fmt::println(stderr, "should set to tainted state var={} id={}", txn_var_name, txn->GetTransactionId()); + std::terminate(); + } + fmt::println(stderr, "- txn_committed_tainted var={} id={} status={} read_ts={}", txn_var_name, + txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs()); +} + +void ExecuteTxnTainted(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn, + const std::string &query) { + fmt::println(stderr, "- execute var={} id={} status={} read_ts={} sql=\"{}\"", txn_var_name, + txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs(), query); + NoopWriter writer; + if (instance.ExecuteSqlTxn(query, writer, txn)) { + std::cerr << "sql should not execute successfully -- there should be a write-write conflict" << std::endl; + std::terminate(); + } + CheckTainted(instance, txn_var_name, txn); +} + +void GarbageCollection(BustubInstance &instance) { + fmt::println(stderr, "- garbage_collection"); + instance.txn_manager_->GarbageCollection(); +} + +void EnsureTxnGCed(BustubInstance &instance, const std::string &txn_var_name, txn_id_t txn_id) { + fmt::println(stderr, "- ensure_txn_gc_ed var={} id={} watermark={}", txn_var_name, txn_id ^ TXN_START_ID, + instance.txn_manager_->GetWatermark()); + const auto &txn_map = instance.txn_manager_->txn_map_; + if (txn_map.find(txn_id) != txn_map.end()) { + std::cerr << "txn not garbage collected" << std::endl; + std::terminate(); + } +} + +void EnsureTxnExists(BustubInstance &instance, const std::string &txn_var_name, txn_id_t txn_id) { + fmt::println(stderr, "- ensure_txn_exists var={} id={} watermark={}", txn_var_name, txn_id ^ TXN_START_ID, + instance.txn_manager_->GetWatermark()); + const auto &txn_map = instance.txn_manager_->txn_map_; + if (txn_map.find(txn_id) == txn_map.end()) { + std::cerr << "txn not exist" << std::endl; + std::terminate(); + } +} + +void BumpCommitTs(BustubInstance &instance, int by = 1) { + auto before_commit_ts = instance.txn_manager_->last_commit_ts_.load(); + for (int i = 0; i < by; i++) { + auto txn = instance.txn_manager_->Begin(); + instance.txn_manager_->Commit(txn); + } + auto after_commit_ts = instance.txn_manager_->last_commit_ts_.load(); + fmt::println(stderr, "- bump_commit_ts from={} to={} watermark={}", before_commit_ts, after_commit_ts, + instance.txn_manager_->GetWatermark()); +} + +void EnsureIndexScan(BustubInstance &instance) { + global_disable_execution_exception_print.store(true); + fmt::println(stderr, "- pre-test index validation"); + NoopWriter writer; + auto res = instance.ExecuteSql("CREATE TABLE pk_test_table_n(pk int primary key)", writer); + if (!res) { + std::terminate(); + } + res = instance.ExecuteSql("set force_optimizer_starter_rule=yes", writer); + if (!res) { + std::terminate(); + } + + { + std::stringstream ss; + SimpleStreamWriter writer_ss(ss); + auto *txn = instance.txn_manager_->Begin(); + res = instance.ExecuteSqlTxn("EXPLAIN SELECT * FROM pk_test_table_n WHERE pk = 1", writer_ss, txn); + if (!StringUtil::Contains(ss.str(), "IndexScan")) { + fmt::println("index scan not found in plan:\n{}\n", ss.str()); + std::terminate(); + } + } + + global_disable_execution_exception_print.store(false); +} + +template +void QueryIndex(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn, const std::string &query, + const std::string &pk_column, const std::vector &expected_pk, + const std::vector> &expected_rows) { + assert(expected_pk.size() == expected_rows.size()); + fmt::print(stderr, "- query_index var={} id={} status={} read_ts={} query=\"{}\" pk={}", txn_var_name, + txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs(), query, + fmt::join(expected_pk, ",")); + if (txn->GetTransactionState() != TransactionState::RUNNING) { + fmt::println(stderr, "txn not running"); + std::terminate(); + } + for (size_t i = 0; i < expected_pk.size(); i++) { + StringVectorWriter writer; + auto res = instance.ExecuteSqlTxn(fmt::format("{} WHERE {} = {}", query, pk_column, expected_pk[i]), writer, txn); + if (!res) { + fmt::println("failed to execute query when {} = {}", pk_column, expected_pk[i]); + std::terminate(); + } + if (expected_rows[i].empty()) { + if (!writer.values_.empty()) { + fmt::println("ERROR: expect {} = {} to have 0 rows, found ({})", pk_column, expected_pk[i], + fmt::join(writer.values_[0], ",")); + std::terminate(); + } + } else { + if (writer.values_.size() != 1) { + fmt::println("ERROR: expect {} = {} to have 1 row, found {} rows", pk_column, expected_pk[i], + writer.values_.size()); + std::terminate(); + } + if (writer.values_[0] != VecToVecString(expected_rows[i])) { + fmt::println("ERROR: expect {} = {} to be ({}), found ({})", pk_column, expected_pk[i], + fmt::join(expected_rows[0], ","), fmt::join(writer.values_[0], ",")); + std::terminate(); + } + } + } + fmt::println(" -- pass check :)"); +} + +using IntResult = std::vector>; + +} // namespace bustub diff --git a/test/txn/txn_executor_test.cpp b/test/txn/txn_executor_test.cpp new file mode 100644 index 000000000..c4ea42ac9 --- /dev/null +++ b/test/txn/txn_executor_test.cpp @@ -0,0 +1,218 @@ +#include "execution/execution_common.h" +#include "txn_common.h" // NOLINT + +namespace bustub { + +// NOLINTBEGIN(bugprone-unchecked-optional-access) + +TEST(TxnExecutorTest, DISABLED_InsertTest) { // NOLINT + auto bustub = std::make_unique(); + Execute(*bustub, "CREATE TABLE maintable(a int)"); + auto table_info = bustub->catalog_->GetTable("maintable"); + auto txn1 = BeginTxn(*bustub, "txn1"); + auto txn2 = BeginTxn(*bustub, "txn2"); + + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (1)")); + WithTxn(txn2, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (2)")); + + TxnMgrDbg("after insertion", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + + const std::string query = "SELECT a FROM maintable"; + fmt::println(stderr, "A: check scan txn1"); + WithTxn(txn1, QueryShowResult(*bustub, _var, _txn, query, IntResult{{1}})); +} + +TEST(TxnExecutorTest, DISABLED_InsertCommitTest) { // NOLINT + auto bustub = std::make_unique(); + Execute(*bustub, "CREATE TABLE maintable(a int)"); + auto table_info = bustub->catalog_->GetTable("maintable"); + auto txn1 = BeginTxn(*bustub, "txn1"); + auto txn2 = BeginTxn(*bustub, "txn2"); + + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (1)")); + WithTxn(txn2, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (2)")); + + TxnMgrDbg("after insertion", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + + const std::string query = "SELECT a FROM maintable"; + fmt::println(stderr, "A: check scan txn1"); + WithTxn(txn1, QueryShowResult(*bustub, _var, _txn, query, IntResult{{1}})); + fmt::println(stderr, "B: check scan txn2"); + WithTxn(txn2, QueryShowResult(*bustub, _var, _txn, query, IntResult{{2}})); + WithTxn(txn1, CommitTxn(*bustub, _var, _txn)); + TxnMgrDbg("after commit txn1", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + auto txn3 = BeginTxn(*bustub, "txn3"); + fmt::println(stderr, "C: check scan txn3"); + WithTxn(txn3, QueryShowResult(*bustub, _var, _txn, query, IntResult{{1}})); +} + +TEST(TxnExecutorTest, DISABLED_InsertDeleteTest) { // NOLINT + auto bustub = std::make_unique(); + Execute(*bustub, "CREATE TABLE maintable(a int)"); + auto table_info = bustub->catalog_->GetTable("maintable"); + auto txn1 = BeginTxn(*bustub, "txn1"); + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (1)")); + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (2)")); + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (3)")); + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "DELETE FROM maintable WHERE a = 3")); + TxnMgrDbg("after 3 insert + 1 delete", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + fmt::println(stderr, "A: check scan txn1"); + const auto query = "SELECT a FROM maintable"; + WithTxn(txn1, QueryShowResult(*bustub, _var, _txn, query, IntResult{{1}, {2}})); + WithTxn(txn1, CommitTxn(*bustub, _var, _txn)); + TxnMgrDbg("after commit", bustub->txn_manager_.get(), table_info, table_info->table_.get()); +} + +TEST(TxnExecutorTest, DISABLED_UpdateTest) { // NOLINT + { + fmt::println(stderr, "--- UpdateTest1: no undo log ---"); + auto bustub = std::make_unique(); + Execute(*bustub, "CREATE TABLE table1(a int, b int, c int)"); + auto table_info = bustub->catalog_->GetTable("table1"); + auto txn1 = BeginTxn(*bustub, "txn1"); + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO table1 VALUES (1, 1, 1)")); + TxnMgrDbg("after insert", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + const std::string query = "SELECT * FROM table1"; + fmt::println(stderr, "A: 1st update"); + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "UPDATE table1 SET b = 2")); + TxnMgrDbg("after update", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + WithTxn(txn1, QueryShowResult(*bustub, _var, _txn, query, IntResult{{1, 2, 1}})); + WithTxn(txn1, CheckUndoLogNum(*bustub, _var, _txn, 0)); + } + { + fmt::println(stderr, "--- UpdateTest2: update applied on insert ---"); + auto bustub = std::make_unique(); + Execute(*bustub, "CREATE TABLE table2(a int, b int, c int)"); + auto table_info = bustub->catalog_->GetTable("table2"); + auto txn0 = BeginTxn(*bustub, "txn0"); + WithTxn(txn0, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO table2 VALUES (1, 1, 1)")); + WithTxn(txn0, CommitTxn(*bustub, _var, _txn)); + TxnMgrDbg("after insert and commit", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + auto txn1 = BeginTxn(*bustub, "txn1"); + auto txn_ref = BeginTxn(*bustub, "txn_ref"); + const std::string query = "SELECT * FROM table2"; + fmt::println(stderr, "A: 1st update"); + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "UPDATE table2 SET b = 2")); + TxnMgrDbg("after update", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + WithTxn(txn1, QueryShowResult(*bustub, _var, _txn, query, IntResult{{1, 2, 1}})); + WithTxn(txn_ref, QueryShowResult(*bustub, _var, _txn, query, IntResult{{1, 1, 1}})); + WithTxn(txn1, CheckUndoLogColumn(*bustub, _var, _txn, 1)); + } +} + +TEST(TxnExecutorTest, DISABLED_UpdateConflict) { // NOLINT + { + fmt::println(stderr, "--- UpdateConflict1: simple case, insert and two txn update it ---"); + auto bustub = std::make_unique(); + Execute(*bustub, "CREATE TABLE table1(a int, b int, c int)"); + auto table_info = bustub->catalog_->GetTable("table1"); + auto txn0 = BeginTxn(*bustub, "txn0"); + WithTxn(txn0, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO table1 VALUES (0, 0, 0)")); + WithTxn(txn0, CommitTxn(*bustub, _var, _txn)); + TxnMgrDbg("after initialize", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + auto txn1 = BeginTxn(*bustub, "txn1"); + auto txn2 = BeginTxn(*bustub, "txn2"); + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "UPDATE table1 SET a = 1")); + TxnMgrDbg("after 1st update", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + WithTxn(txn2, ExecuteTxnTainted(*bustub, _var, _txn, "UPDATE table1 SET b = 2")); + TxnMgrDbg("after txn tainted", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + WithTxn(txn1, CommitTxn(*bustub, _var, _txn)); + TxnMgrDbg("after commit", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + } +} + +TEST(TxnExecutorTest, DISABLED_GarbageCollection) { // NOLINT + auto bustub = std::make_unique(); + Execute(*bustub, "CREATE TABLE table1(a int, b int, c int)"); + auto table_info = bustub->catalog_->GetTable("table1"); + const std::string query = "SELECT * FROM table1"; + auto txn_watermark_at_0 = BeginTxn(*bustub, "txn_watermark_at_0"); + auto txn_watermark_at_0_id = txn_watermark_at_0->GetTransactionId(); + BumpCommitTs(*bustub, 2); + auto txn_a = BeginTxn(*bustub, "txn_a"); + auto txn_a_id = txn_a->GetTransactionId(); + WithTxn(txn_a, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO table1 VALUES (0, 0, 0), (1, 1, 1)")); + WithTxn(txn_a, CommitTxn(*bustub, _var, _txn)); + auto txn_b = BeginTxn(*bustub, "txn_b"); + auto txn_b_id = txn_b->GetTransactionId(); + WithTxn(txn_b, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO table1 VALUES (2, 2, 2), (3, 3, 3)")); + WithTxn(txn_b, CommitTxn(*bustub, _var, _txn)); + BumpCommitTs(*bustub, 2); + auto txn_watermark_at_1 = BeginTxn(*bustub, "txn_watermark_at_1"); + auto txn_watermark_at_1_id = txn_watermark_at_1->GetTransactionId(); + BumpCommitTs(*bustub, 2); + auto txn2 = BeginTxn(*bustub, "txn2"); + auto txn2_id = txn2->GetTransactionId(); + WithTxn(txn2, ExecuteTxn(*bustub, _var, _txn, "UPDATE table1 SET a = a + 10")); + WithTxn(txn2, QueryShowResult(*bustub, _var, _txn, query, IntResult{{10, 0, 0}, {11, 1, 1}, {12, 2, 2}, {13, 3, 3}})); + WithTxn(txn2, CommitTxn(*bustub, _var, _txn)); + BumpCommitTs(*bustub, 2); + auto txn_watermark_at_2 = BeginTxn(*bustub, "txn_watermark_at_2"); + auto txn_watermark_at_2_id = txn_watermark_at_2->GetTransactionId(); + BumpCommitTs(*bustub, 2); + auto txn3 = BeginTxn(*bustub, "txn3"); + auto txn3_id = txn3->GetTransactionId(); + WithTxn(txn3, ExecuteTxn(*bustub, _var, _txn, "UPDATE table1 SET a = a + 10 WHERE a < 12")); + WithTxn(txn3, ExecuteTxn(*bustub, _var, _txn, "DELETE FROM table1 WHERE a = 21")); + WithTxn(txn3, QueryShowResult(*bustub, _var, _txn, query, IntResult{{20, 0, 0}, {12, 2, 2}, {13, 3, 3}})); + WithTxn(txn3, CommitTxn(*bustub, _var, _txn)); + BumpCommitTs(*bustub, 2); + auto txn_watermark_at_3 = BeginTxn(*bustub, "txn_watermark_at_3"); + auto txn_watermark_at_3_id = txn_watermark_at_3->GetTransactionId(); + BumpCommitTs(*bustub, 2); + TxnMgrDbg("after commit", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + + WithTxn(txn_watermark_at_0, QueryShowResult(*bustub, _var, _txn, query, IntResult{})); + WithTxn(txn_watermark_at_1, + QueryShowResult(*bustub, _var, _txn, query, IntResult{{0, 0, 0}, {1, 1, 1}, {2, 2, 2}, {3, 3, 3}})); + WithTxn(txn_watermark_at_2, + QueryShowResult(*bustub, _var, _txn, query, IntResult{{10, 0, 0}, {11, 1, 1}, {12, 2, 2}, {13, 3, 3}})); + WithTxn(txn_watermark_at_3, + QueryShowResult(*bustub, _var, _txn, query, IntResult{{20, 0, 0}, {12, 2, 2}, {13, 3, 3}})); + + fmt::println(stderr, "A: first GC"); + GarbageCollection(*bustub); + TxnMgrDbg("after garbage collection", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + fmt::println(stderr, "B: second GC"); + GarbageCollection(*bustub); + TxnMgrDbg("after garbage collection (yes, we call it twice without doing anything...)", bustub->txn_manager_.get(), + table_info, table_info->table_.get()); + WithTxn(txn_watermark_at_0, EnsureTxnExists(*bustub, _var, txn_watermark_at_0_id)); + WithTxn(txn_watermark_at_1, EnsureTxnExists(*bustub, _var, txn_watermark_at_1_id)); + WithTxn(txn_watermark_at_2, EnsureTxnExists(*bustub, _var, txn_watermark_at_2_id)); + WithTxn(txn_watermark_at_3, EnsureTxnExists(*bustub, _var, txn_watermark_at_3_id)); + WithTxn(txn_a, EnsureTxnGCed(*bustub, _var, txn_a_id)); + WithTxn(txn_b, EnsureTxnGCed(*bustub, _var, txn_b_id)); + WithTxn(txn2, EnsureTxnExists(*bustub, _var, txn2_id)); + WithTxn(txn3, EnsureTxnExists(*bustub, _var, txn3_id)); + WithTxn(txn_watermark_at_0, QueryShowResult(*bustub, _var, _txn, query, IntResult{})); + WithTxn(txn_watermark_at_1, + QueryShowResult(*bustub, _var, _txn, query, IntResult{{0, 0, 0}, {1, 1, 1}, {2, 2, 2}, {3, 3, 3}})); + WithTxn(txn_watermark_at_2, + QueryShowResult(*bustub, _var, _txn, query, IntResult{{10, 0, 0}, {11, 1, 1}, {12, 2, 2}, {13, 3, 3}})); + WithTxn(txn_watermark_at_3, + QueryShowResult(*bustub, _var, _txn, query, IntResult{{20, 0, 0}, {12, 2, 2}, {13, 3, 3}})); + + fmt::println(stderr, "C: 3rd GC"); + WithTxn(txn_watermark_at_0, CommitTxn(*bustub, _var, _txn)); + GarbageCollection(*bustub); + TxnMgrDbg("after garbage collection", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + WithTxn(txn_watermark_at_0, EnsureTxnGCed(*bustub, _var, txn_watermark_at_0_id)); + WithTxn(txn_watermark_at_1, EnsureTxnExists(*bustub, _var, txn_watermark_at_1_id)); + WithTxn(txn_watermark_at_2, EnsureTxnExists(*bustub, _var, txn_watermark_at_2_id)); + WithTxn(txn_watermark_at_3, EnsureTxnExists(*bustub, _var, txn_watermark_at_3_id)); + WithTxn(txn_a, EnsureTxnGCed(*bustub, _var, txn_a_id)); + WithTxn(txn_b, EnsureTxnGCed(*bustub, _var, txn_b_id)); + WithTxn(txn2, EnsureTxnExists(*bustub, _var, txn2_id)); + WithTxn(txn3, EnsureTxnExists(*bustub, _var, txn3_id)); + WithTxn(txn_watermark_at_1, + QueryShowResult(*bustub, _var, _txn, query, IntResult{{0, 0, 0}, {1, 1, 1}, {2, 2, 2}, {3, 3, 3}})); + WithTxn(txn_watermark_at_2, + QueryShowResult(*bustub, _var, _txn, query, IntResult{{10, 0, 0}, {11, 1, 1}, {12, 2, 2}, {13, 3, 3}})); + WithTxn(txn_watermark_at_3, + QueryShowResult(*bustub, _var, _txn, query, IntResult{{20, 0, 0}, {12, 2, 2}, {13, 3, 3}})); +} + +// NOLINTEND(bugprone-unchecked-optional-access)) + +} // namespace bustub diff --git a/test/txn/txn_index_test.cpp b/test/txn/txn_index_test.cpp new file mode 100644 index 000000000..74de5b8ed --- /dev/null +++ b/test/txn/txn_index_test.cpp @@ -0,0 +1,159 @@ +#include // NOLINT +#include +#include // NOLINT +#include // NOLINT +#include "common/macros.h" +#include "execution/execution_common.h" +#include "fmt/core.h" +#include "txn_common.h" // NOLINT + +namespace bustub { + +// NOLINTBEGIN(bugprone-unchecked-optional-access) + +TEST(TxnIndexTest, DISABLED_IndexInsertTest) { // NOLINT + auto bustub = std::make_unique(); + const std::string query = "SELECT * FROM maintable"; + + Execute(*bustub, "CREATE TABLE maintable(a int primary key, b int)"); + auto table_info = bustub->catalog_->GetTable("maintable"); + auto txn1 = BeginTxn(*bustub, "txn1"); + WithTxn(txn1, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (1, 0)")); + TxnMgrDbg("after txn1 insert", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + WithTxn(txn1, QueryShowResult(*bustub, _var, _txn, query, + IntResult{ + {1, 0}, + })); + WithTxn(txn1, ExecuteTxnTainted(*bustub, _var, _txn, "INSERT INTO maintable VALUES (1, 1)")); + TxnMgrDbg("after txn1 taint", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + TableHeapEntryNoMoreThan(*bustub, table_info, 1); + + auto txn2 = BeginTxn(*bustub, "txn2"); + WithTxn(txn2, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (2, 2)")); + TxnMgrDbg("after txn2 insert", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + WithTxn(txn2, QueryShowResult(*bustub, _var, _txn, query, + IntResult{ + {2, 2}, + })); + WithTxn(txn2, CommitTxn(*bustub, _var, _txn)); + TxnMgrDbg("after txn2 commit", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + TableHeapEntryNoMoreThan(*bustub, table_info, 2); + + auto txn3 = BeginTxn(*bustub, "txn3"); + WithTxn(txn3, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (3, 3)")); + TxnMgrDbg("after txn3 insert", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + WithTxn(txn3, QueryShowResult(*bustub, _var, _txn, query, + IntResult{ + {2, 2}, + {3, 3}, + })); + WithTxn(txn3, CommitTxn(*bustub, _var, _txn)); + TxnMgrDbg("after txn3 commit", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + TableHeapEntryNoMoreThan(*bustub, table_info, 3); + + auto txn4 = BeginTxn(*bustub, "txn4"); + WithTxn(txn4, QueryShowResult(*bustub, _var, _txn, query, + IntResult{ + {2, 2}, + {3, 3}, + })); + WithTxn(txn4, ExecuteTxnTainted(*bustub, _var, _txn, "INSERT INTO maintable VALUES (3, 4)")); + TxnMgrDbg("after txn4 taint", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + TableHeapEntryNoMoreThan(*bustub, table_info, 3); + + auto txn5 = BeginTxn(*bustub, "txn5"); + WithTxn(txn5, ExecuteTxn(*bustub, _var, _txn, "INSERT INTO maintable VALUES (4, 4)")); + TxnMgrDbg("after txn5 insert", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + WithTxn(txn5, QueryShowResult(*bustub, _var, _txn, query, + IntResult{ + {2, 2}, + {3, 3}, + {4, 4}, + })); + TableHeapEntryNoMoreThan(*bustub, table_info, 4); + + auto txn6 = BeginTxn(*bustub, "txn6"); + WithTxn(txn6, QueryShowResult(*bustub, _var, _txn, query, + IntResult{ + {2, 2}, + {3, 3}, + })); +} + +TEST(TxnIndexTest, DISABLED_IndexConcurrentInsertTest) { // NOLINT + const auto generate_sql = [](int thread_id, int n) -> std::string { + return fmt::format("INSERT INTO maintable VALUES ({}, {})", n, thread_id); + }; + for (int n = 0; n < 50; n++) { + auto bustub = std::make_unique(); + Execute(*bustub, "CREATE TABLE maintable(a int primary key, b int)"); + std::vector insert_threads; + const int thread_cnt = 8; + const int number_cnt = 80; + insert_threads.reserve(thread_cnt); + std::map> operation_result; + std::mutex result_mutex; + fmt::println(stderr, "trial {}: running with {} threads with {} rows", n + 1, thread_cnt, number_cnt); + global_disable_execution_exception_print.store(true); + for (int thread = 0; thread < thread_cnt; thread++) { + insert_threads.emplace_back([&bustub, thread, generate_sql, &result_mutex, &operation_result]() { + NoopWriter writer; + std::vector result; + result.reserve(number_cnt); + for (int i = 0; i < number_cnt; i++) { + auto sql = generate_sql(thread, i); + auto *txn = bustub->txn_manager_->Begin(); + if (bustub->ExecuteSqlTxn(sql, writer, txn)) { + result.push_back(true); + BUSTUB_ENSURE(bustub->txn_manager_->Commit(txn), "cannot commit??"); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } else { + result.push_back(false); + } + } + { + std::lock_guard lck(result_mutex); + operation_result.emplace(thread, std::move(result)); + } + }); + } + for (auto &&thread : insert_threads) { + thread.join(); + } + global_disable_execution_exception_print.store(false); + std::vector> expected_rows; + std::map winner_stats; + for (int i = 0; i < number_cnt; i++) { + int winner = -1; + for (int j = 0; j < thread_cnt; j++) { + if (operation_result[j][i]) { + if (winner != -1) { + fmt::println(stderr, "multiple winner for inserting {}: [{}]", i, fmt::join(operation_result[j], ",")); + std::terminate(); + } + winner = j; + } + } + if (winner == -1) { + fmt::println(stderr, "no winner for inserting {}"); + std::terminate(); + } + winner_stats[winner]++; + expected_rows.push_back({i, winner}); + } + for (auto &&[winner, cnt] : winner_stats) { + if (cnt == number_cnt) { + fmt::println(stderr, "WARNING: biased winner {}: cnt={}", winner, cnt); + std::terminate(); + } + } + auto query_txn = BeginTxn(*bustub, "query_txn"); + WithTxn(query_txn, QueryShowResult(*bustub, _var, _txn, "SELECT * FROM maintable", expected_rows)); + auto entry = TableHeapEntry(*bustub, bustub->catalog_->GetTable("maintable")); + fmt::println("{} entries in the table heap", entry); + } +} + +// NOLINTEND(bugprone-unchecked-optional-access)) + +} // namespace bustub diff --git a/test/txn/txn_scan_test.cpp b/test/txn/txn_scan_test.cpp new file mode 100644 index 000000000..8a267326b --- /dev/null +++ b/test/txn/txn_scan_test.cpp @@ -0,0 +1,168 @@ +#include "txn_common.h" // NOLINT + +namespace bustub { + +// NOLINTBEGIN(bugprone-unchecked-optional-access) + +TEST(TxnScanTest, DISABLED_TupleReconstructTest) { // NOLINT + auto schema = ParseCreateStatement("a integer,b double,c boolean"); + { + fmt::println(stderr, "A: only base tuple"); + auto base_tuple = Tuple{{Int(0), Double(1), BoolNull()}, schema.get()}; + auto base_meta = TupleMeta{2333, false}; + auto tuple = ReconstructTuple(schema.get(), base_tuple, base_meta, {}); + ASSERT_TRUE(tuple.has_value()); + VerifyTuple(schema.get(), *tuple, {Int(0), Double(1), BoolNull()}); + } + { + fmt::println(stderr, "B: deleted base tuple + reconstruct 1 record"); + auto base_tuple = Tuple{{IntNull(), DoubleNull(), BoolNull()}, schema.get()}; + auto base_meta = TupleMeta{2333, true}; + { + fmt::println(stderr, "B1: verify base tuple"); + auto tuple = ReconstructTuple(schema.get(), base_tuple, base_meta, {}); + ASSERT_FALSE(tuple.has_value()); + } + auto undo_log_1 = UndoLog{false, {true, true, true}, Tuple{{Int(1), Double(2), Bool(false)}, schema.get()}}; + { + fmt::println(stderr, "B2: verify 1st record"); + auto tuple = ReconstructTuple(schema.get(), base_tuple, base_meta, {undo_log_1}); + ASSERT_TRUE(tuple.has_value()); + VerifyTuple(schema.get(), *tuple, {Int(1), Double(2), Bool(false)}); + } + } + { + fmt::println(stderr, "(partial) C: reconstruct 4 records, one of them is empty, one of them is full change"); + auto base_tuple = Tuple{{Int(0), Double(1), BoolNull()}, schema.get()}; + auto base_meta = TupleMeta{2333, false}; + auto undo_log_1_schema = ParseCreateStatement(""); + auto undo_log_1 = UndoLog{false, {false, false, false}, Tuple{{}, undo_log_1_schema.get()}}; + { + fmt::println(stderr, "C1: verify 1st record"); + auto tuple = ReconstructTuple(schema.get(), base_tuple, base_meta, {undo_log_1}); + ASSERT_TRUE(tuple.has_value()); + VerifyTuple(schema.get(), *tuple, {Int(0), Double(1), BoolNull()}); + } + } + { + fmt::println(stderr, "(partial) D: reconstruct 4 records, two of them are delete"); + auto base_tuple = Tuple{{Int(0), Double(1), BoolNull()}, schema.get()}; + auto base_meta = TupleMeta{2333, false}; + auto delete_schema = ParseCreateStatement(""); + auto delete_log = UndoLog{true, {false, false, false}, Tuple{{}, delete_schema.get()}}; + { + fmt::println(stderr, "D1: apply delete record"); + auto tuple = ReconstructTuple(schema.get(), base_tuple, base_meta, {delete_log}); + ASSERT_FALSE(tuple.has_value()); + } + } +} + +TEST(TxnScanTest, DISABLED_ScanTest) { // NOLINT + auto bustub = std::make_unique(); + auto schema = ParseCreateStatement("a integer,b double,c boolean"); + auto modify_schema = ParseCreateStatement("a integer"); + auto empty_schema = ParseCreateStatement(""); + auto table_info = bustub->catalog_->CreateTable(nullptr, "maintable", *schema); + + // record1: txn4 (val=1) -> ts=1 in txn4 (val=2/prev_log_1) + // record2: ts=3 (val=3) -> ts=2 in txn_store_3 (delete/prev_log_2) -> ts=1 in txn_store_2 (val=4/prev_log_3) + // record3: ts=4 (delete) -> ts=3 in txn_store_4 (val=5/prev_log_4) + // record4: txn3 (delete) -> ts=2 in txn3 (val=6/prev_log_5) -> ts=1 in txn_store_2 (val=7/prev_log_6) + + UndoLink prev_log_1; + UndoLink prev_log_2; + UndoLink prev_log_3; + UndoLink prev_log_4; + UndoLink prev_log_5; + UndoLink prev_log_6; + auto txn0 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn0->GetReadTs(), 0); + + { + auto txn_store_1 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn_store_1->GetReadTs(), 0); + bustub->txn_manager_->Commit(txn_store_1); + ASSERT_EQ(txn_store_1->GetCommitTs(), 1); + } + + auto txn1 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn1->GetReadTs(), 1); + + { + auto txn_store_2 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn_store_2->GetReadTs(), 1); + prev_log_3 = txn_store_2->AppendUndoLog( + UndoLog{false, {true, true, true}, Tuple{{Int(4), DoubleNull(), BoolNull()}, schema.get()}, 1, {}}); + prev_log_6 = + txn_store_2->AppendUndoLog(UndoLog{false, {true, false, false}, Tuple{{Int(7)}, modify_schema.get()}, 1, {}}); + bustub->txn_manager_->Commit(txn_store_2); + ASSERT_EQ(txn_store_2->GetCommitTs(), 2); + } + + auto txn2 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn2->GetReadTs(), 2); + + { + auto txn_store_3 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn_store_3->GetReadTs(), 2); + prev_log_2 = + txn_store_3->AppendUndoLog(UndoLog{true, {false, false, false}, Tuple{{}, empty_schema.get()}, 2, prev_log_3}); + bustub->txn_manager_->Commit(txn_store_3); + ASSERT_EQ(txn_store_3->GetCommitTs(), 3); + } + + auto txn3 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn3->GetReadTs(), 3); + + prev_log_5 = txn3->AppendUndoLog( + UndoLog{false, {true, true, true}, Tuple{{Int(6), DoubleNull(), BoolNull()}, schema.get()}, 2, prev_log_6}); + + { + auto txn_store_4 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn_store_4->GetReadTs(), 3); + prev_log_4 = txn_store_4->AppendUndoLog( + UndoLog{false, {true, true, true}, Tuple{{Int(5), DoubleNull(), BoolNull()}, schema.get()}, 3, {}}); + bustub->txn_manager_->Commit(txn_store_4); + ASSERT_EQ(txn_store_4->GetCommitTs(), 4); + } + + auto txn4 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn4->GetReadTs(), 4); + + { + auto txn_store_5 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn_store_5->GetReadTs(), 4); + bustub->txn_manager_->Commit(txn_store_5); + } + + auto txn5 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn5->GetReadTs(), 5); + + prev_log_1 = txn4->AppendUndoLog(UndoLog{false, {true, false, false}, Tuple{{Int(2)}, modify_schema.get()}, 1, {}}); + + auto rid1 = *table_info->table_->InsertTuple(TupleMeta{txn4->GetTransactionTempTs(), false}, + Tuple{{Int(1), DoubleNull(), BoolNull()}, schema.get()}); + bustub->txn_manager_->UpdateVersionLink(rid1, VersionUndoLink{prev_log_1}, nullptr); + auto rid2 = *table_info->table_->InsertTuple(TupleMeta{txn3->GetReadTs(), false}, + Tuple{{Int(3), DoubleNull(), BoolNull()}, schema.get()}); + bustub->txn_manager_->UpdateVersionLink(rid2, VersionUndoLink{prev_log_2}, nullptr); + auto rid3 = *table_info->table_->InsertTuple(TupleMeta{txn4->GetReadTs(), true}, + Tuple{{IntNull(), DoubleNull(), BoolNull()}, schema.get()}); + bustub->txn_manager_->UpdateVersionLink(rid3, VersionUndoLink{prev_log_4}, nullptr); + auto rid4 = *table_info->table_->InsertTuple(TupleMeta{txn3->GetTransactionTempTs(), true}, + Tuple{{IntNull(), DoubleNull(), BoolNull()}, schema.get()}); + bustub->txn_manager_->UpdateVersionLink(rid4, VersionUndoLink{prev_log_5}, nullptr); + + TxnMgrDbg("before verify scan", bustub->txn_manager_.get(), table_info, table_info->table_.get()); + + auto query = "SELECT a FROM maintable"; + fmt::println(stderr, "A: Verify txn0"); + WithTxn(txn0, QueryShowResult(*bustub, _var, _txn, query, IntResult{})); + fmt::println(stderr, "B: Verify txn1"); + WithTxn(txn1, QueryShowResult(*bustub, _var, _txn, query, IntResult{{2}, {4}, {7}})); +} + +// NOLINTEND(bugprone-unchecked-optional-access)) + +} // namespace bustub diff --git a/test/txn/txn_timestamp_test.cpp b/test/txn/txn_timestamp_test.cpp new file mode 100644 index 000000000..4fc310506 --- /dev/null +++ b/test/txn/txn_timestamp_test.cpp @@ -0,0 +1,56 @@ +#include "txn_common.h" // NOLINT + +namespace bustub { + +TEST(TxnTsTest, DISABLED_WatermarkPerformance) { // NOLINT + const int txn_n = 1000000; + { + auto watermark = Watermark(0); + for (int i = 0; i < txn_n; i++) { + watermark.AddTxn(i); + ASSERT_EQ(watermark.GetWatermark(), 0); + } + for (int i = 0; i < txn_n; i++) { + watermark.UpdateCommitTs(i + 1); + watermark.RemoveTxn(i); + ASSERT_EQ(watermark.GetWatermark(), i + 1); + } + } + { + auto watermark = Watermark(0); + for (int i = 0; i < txn_n; i++) { + watermark.AddTxn(i); + ASSERT_EQ(watermark.GetWatermark(), 0); + } + for (int i = 0; i < txn_n; i++) { + watermark.UpdateCommitTs(i + 1); + watermark.RemoveTxn(txn_n - i - 1); + if (i == txn_n - 1) { + ASSERT_EQ(watermark.GetWatermark(), txn_n); + } else { + ASSERT_EQ(watermark.GetWatermark(), 0); + } + } + } +} + +TEST(TxnTsTest, DISABLED_TimestampTracking) { // NOLINT + auto bustub = std::make_unique(); + auto txn0 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn0->GetReadTs(), 0); + ASSERT_EQ(bustub->txn_manager_->GetWatermark(), 0); + bustub->txn_manager_->Commit(txn0); + ASSERT_EQ(bustub->txn_manager_->GetWatermark(), 1); + + auto txn1 = bustub->txn_manager_->Begin(); + ASSERT_EQ(txn1->GetReadTs(), 1); + ASSERT_EQ(bustub->txn_manager_->GetWatermark(), 1); + bustub->txn_manager_->Abort(txn1); + ASSERT_EQ(bustub->txn_manager_->GetWatermark(), 1); + + // test case continues on Gradescope... +} + +// NOLINTEND(bugprone-unchecked-optional-access)) + +} // namespace bustub diff --git a/tools/terrier_bench/terrier.cpp b/tools/terrier_bench/terrier.cpp index 37a2039b9..a501f4531 100644 --- a/tools/terrier_bench/terrier.cpp +++ b/tools/terrier_bench/terrier.cpp @@ -67,14 +67,14 @@ struct TerrierTotalMetrics { auto update_txn_per_sec = committed_update_txn_cnt_ / static_cast(elsped) * 1000; auto verify_txn_per_sec = committed_verify_txn_cnt_ / static_cast(elsped) * 1000; - fmt::print("<<< BEGIN\n"); + fmt::print(stderr, "<<< BEGIN\n"); // ensure the verifying thread is not blocked - fmt::print("update: {}\n", update_txn_per_sec); - fmt::print("count: {}\n", count_txn_per_sec); - fmt::print("verify: {}\n", verify_txn_per_sec); + fmt::print(stderr, "update: {}\n", update_txn_per_sec); + fmt::print(stderr, "count: {}\n", count_txn_per_sec); + fmt::print(stderr, "verify: {}\n", verify_txn_per_sec); - fmt::print(">>> END\n"); + fmt::print(stderr, ">>> END\n"); } }; @@ -102,6 +102,7 @@ struct TerrierMetrics { auto elsped = now - start_time_; if (elsped - last_report_at_ > 1000) { fmt::print( + stderr, "[{:5.2f}] {}: total_committed_txn={:<5} total_aborted_txn={:<5} throughput={:<6.3} avg_throughput={:<6.3}\n", elsped / 1000.0, reporter_, committed_txn_cnt_, aborted_txn_cnt_, (committed_txn_cnt_ - last_committed_txn_cnt_) / static_cast(elsped - last_report_at_) * 1000, @@ -127,15 +128,11 @@ auto ParseBool(const std::string &str) -> bool { throw bustub::Exception(fmt::format("unexpected arg: {}", str)); } -void CheckTableLock(bustub::Transaction *txn) { - if (!txn->GetExclusiveTableLockSet()->empty() || !txn->GetSharedTableLockSet()->empty()) { - fmt::print("should not acquire S/X table lock, grab IS/IX instead"); - exit(1); - } -} +void CheckTableLock(bustub::Transaction *txn) {} // NOLINTNEXTLINE auto main(int argc, char **argv) -> int { + const auto isolation_lvl = bustub::IsolationLevel::SNAPSHOT_ISOLATION; argparse::ArgumentParser program("bustub-terrier-bench"); program.add_argument("--duration").help("run terrier bench for n milliseconds"); program.add_argument("--force-create-index").help("create index in terrier bench"); @@ -153,11 +150,10 @@ auto main(int argc, char **argv) -> int { } auto bustub = std::make_unique(); - bustub->txn_manager_->SetTerrier(); auto writer = bustub::SimpleStreamWriter(std::cerr); // create schema - auto schema = "CREATE TABLE nft(id int, terrier int);"; + auto schema = "CREATE TABLE nft(id int PRIMARY KEY, terrier int);"; std::cerr << "x: create schema" << std::endl; bustub->ExecuteSql(schema, writer); @@ -209,6 +205,16 @@ auto main(int argc, char **argv) -> int { std::cerr << "x: benchmark for " << duration_ms << "ms" << std::endl; std::cerr << "x: nft_num=" << bustub_nft_num << std::endl; + std::cerr << "x: please ensure plans are correct for all queries." << std::endl; + + { + bustub->ExecuteSql("explain (o) UPDATE nft SET terrier = 0 WHERE id = 0", writer); + bustub->ExecuteSql("explain (o) DELETE FROM nft WHERE id = 0", writer); + bustub->ExecuteSql("explain (o) INSERT INTO nft VALUES (0, 0)", writer); + bustub->ExecuteSql("explain (o) SELECT * from nft", writer); + bustub->ExecuteSql("explain (o) SELECT count(*) FROM nft WHERE terrier = 0", writer); + } + // initialize data std::cerr << "x: initialize data" << std::endl; std::string query = "INSERT INTO nft VALUES "; @@ -224,13 +230,13 @@ auto main(int argc, char **argv) -> int { { std::stringstream ss; auto writer = bustub::SimpleStreamWriter(ss, true); - auto txn = bustub->txn_manager_->Begin(nullptr, bustub::IsolationLevel::REPEATABLE_READ); + auto txn = bustub->txn_manager_->Begin(isolation_lvl); auto success = bustub->ExecuteSqlTxn(query, writer, txn); BUSTUB_ENSURE(success, "txn not success"); bustub->txn_manager_->Commit(txn); - delete txn; + if (ss.str() != fmt::format("{}\t\n", bustub_nft_num)) { - fmt::print("unexpected result \"{}\" when insert\n", ss.str()); + fmt::print(stderr, "unexpected result \"{}\" when insert\n", ss.str()); exit(1); } } @@ -266,18 +272,18 @@ auto main(int argc, char **argv) -> int { bool txn_success = true; if (verbose) { - fmt::print("begin: thread {} update nft {} to terrier {}\n", thread_id, nft_id, terrier_id); + fmt::print(stderr, "begin: thread {} update nft {} to terrier {}\n", thread_id, nft_id, terrier_id); } if (enable_update) { - auto txn = bustub->txn_manager_->Begin(nullptr, bustub::IsolationLevel::REPEATABLE_READ); + auto txn = bustub->txn_manager_->Begin(isolation_lvl); std::string query = fmt::format("UPDATE nft SET terrier = {} WHERE id = {}", terrier_id, nft_id); if (!bustub->ExecuteSqlTxn(query, writer, txn)) { txn_success = false; } if (txn_success && ss.str() != "1\t\n") { - fmt::print("unexpected result \"{}\",\n", ss.str()); + fmt::print(stderr, "unexpected result when update \"{}\",\n", ss.str()); exit(1); } @@ -289,9 +295,9 @@ auto main(int argc, char **argv) -> int { bustub->txn_manager_->Abort(txn); metrics.TxnAborted(); } - delete txn; + } else { - auto txn = bustub->txn_manager_->Begin(nullptr, bustub::IsolationLevel::REPEATABLE_READ); + auto txn = bustub->txn_manager_->Begin(isolation_lvl); std::string query = fmt::format("DELETE FROM nft WHERE id = {}", nft_id); if (!bustub->ExecuteSqlTxn(query, writer, txn)) { @@ -299,14 +305,14 @@ auto main(int argc, char **argv) -> int { } if (txn_success && ss.str() != "1\t\n") { - fmt::print("unexpected result \"{}\",\n", ss.str()); + fmt::print(stderr, "unexpected result when delete \"{}\",\n", ss.str()); exit(1); } if (!txn_success) { bustub->txn_manager_->Abort(txn); metrics.TxnAborted(); - delete txn; + } else { query = fmt::format("INSERT INTO nft VALUES ({}, {})", nft_id, terrier_id); if (!bustub->ExecuteSqlTxn(query, writer, txn)) { @@ -314,7 +320,7 @@ auto main(int argc, char **argv) -> int { } if (txn_success && ss.str() != "1\t\n1\t\n") { - fmt::print("unexpected result \"{}\",\n", ss.str()); + fmt::print(stderr, "unexpected result when insert \"{}\",\n", ss.str()); exit(1); } @@ -326,12 +332,11 @@ auto main(int argc, char **argv) -> int { bustub->txn_manager_->Commit(txn); metrics.TxnCommitted(); } - delete txn; } } if (verbose) { - fmt::print("end : thread {} update nft {} to terrier {}\n", thread_id, nft_id, terrier_id); + fmt::print(stderr, "end : thread {} update nft {} to terrier {}\n", thread_id, nft_id, terrier_id); } metrics.Report(); @@ -355,7 +360,7 @@ auto main(int argc, char **argv) -> int { auto writer = bustub::SimpleStreamWriter(ss, true); auto terrier_id = terrier_uniform_dist(gen); - auto txn = bustub->txn_manager_->Begin(nullptr, bustub::IsolationLevel::REPEATABLE_READ); + auto txn = bustub->txn_manager_->Begin(isolation_lvl); bool txn_success = true; std::string query = fmt::format("SELECT count(*) FROM nft WHERE terrier = {}", terrier_id); @@ -371,7 +376,6 @@ auto main(int argc, char **argv) -> int { bustub->txn_manager_->Abort(txn); metrics.TxnAborted(); } - delete txn; metrics.Report(); } @@ -392,7 +396,7 @@ auto main(int argc, char **argv) -> int { std::stringstream ss; auto writer = bustub::SimpleStreamWriter(ss, true); - auto txn = bustub->txn_manager_->Begin(nullptr, bustub::IsolationLevel::REPEATABLE_READ); + auto txn = bustub->txn_manager_->Begin(bustub::IsolationLevel::SNAPSHOT_ISOLATION); bool txn_success = true; std::string query = "SELECT * FROM nft"; @@ -413,18 +417,18 @@ auto main(int argc, char **argv) -> int { // Due to how BusTub works for now, it is impossible to get more than bustub_nft_num rows, but it is possible to // get fewer than that number. if (all_nfts_integer.size() != bustub_nft_num) { - fmt::print("unexpected result when verifying length. scan result: {}, total rows: {}.\n", + fmt::print(stderr, "unexpected result when verifying length. scan result: {}, total rows: {}.\n", all_nfts_integer.size(), bustub_nft_num); if (bustub_nft_num <= 100) { - fmt::print("This is everything in your database:\n{}", ss.str()); + fmt::print(stderr, "This is everything in your database:\n{}", ss.str()); } exit(1); } for (int i = 0; i < static_cast(bustub_nft_num); i++) { if (all_nfts_integer[i] != i) { - fmt::print("unexpected result when verifying \"{} == {}\",\n", i, all_nfts_integer[i]); + fmt::print(stderr, "unexpected result when verifying \"{} == {}\",\n", i, all_nfts_integer[i]); if (bustub_nft_num <= 100) { - fmt::print("This is everything in your database:\n{}", ss.str()); + fmt::print(stderr, "This is everything in your database:\n{}", ss.str()); } exit(1); } @@ -442,9 +446,10 @@ auto main(int argc, char **argv) -> int { if (txn_success) { if (ss.str() != prev_result) { - fmt::print("ERROR: non repeatable read!\n"); + fmt::print(stderr, "ERROR: non repeatable read!\n"); if (bustub_nft_num <= 100) { - fmt::print("This is everything in your database:\n--- previous query ---\n{}\n--- this query ---\n{}\n", + fmt::print(stderr, + "This is everything in your database:\n--- previous query ---\n{}\n--- this query ---\n{}\n", prev_result, ss.str()); } exit(1); @@ -460,7 +465,6 @@ auto main(int argc, char **argv) -> int { bustub->txn_manager_->Abort(txn); metrics.TxnAborted(); } - delete txn; metrics.Report(); @@ -480,18 +484,18 @@ auto main(int argc, char **argv) -> int { { std::stringstream ss; auto writer = bustub::SimpleStreamWriter(ss, true); - auto txn = bustub->txn_manager_->Begin(nullptr, bustub::IsolationLevel::REPEATABLE_READ); + auto txn = bustub->txn_manager_->Begin(isolation_lvl); bustub->ExecuteSqlTxn("SELECT count(*) FROM nft", writer, txn); bustub->txn_manager_->Commit(txn); - delete txn; + if (ss.str() != fmt::format("{}\t\n", bustub_nft_num)) { - fmt::print("unexpected result \"{}\" when verifying total nft count\n", ss.str()); + fmt::print(stderr, "unexpected result \"{}\" when verifying total nft count\n", ss.str()); exit(1); } } { - auto txn = bustub->txn_manager_->Begin(nullptr, bustub::IsolationLevel::REPEATABLE_READ); + auto txn = bustub->txn_manager_->Begin(isolation_lvl); size_t cnt = 0; for (int i = 0; i < static_cast(BUSTUB_TERRIER_CNT); i++) { std::stringstream ss; @@ -509,9 +513,9 @@ auto main(int argc, char **argv) -> int { } bustub->txn_manager_->Commit(txn); - delete txn; + if (cnt != bustub_nft_num) { - fmt::print("unexpected result \"{} != {}\" when verifying split nft count\n", cnt, bustub_nft_num); + fmt::print(stderr, "unexpected result \"{} != {}\" when verifying split nft count\n", cnt, bustub_nft_num); exit(1); } } @@ -520,7 +524,7 @@ auto main(int argc, char **argv) -> int { if (total_metrics.committed_verify_txn_cnt_ <= 3 || total_metrics.committed_update_txn_cnt_ < 3 || total_metrics.committed_count_txn_cnt_ < 3) { - fmt::print("too many txn are aborted"); + fmt::print(stderr, "too many txn are aborted"); exit(1); }