Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add project 4 starter code #643

Merged
merged 17 commits into from
Nov 6, 2023
4 changes: 0 additions & 4 deletions src/common/bustub_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,9 @@ auto BustubInstance::ExecuteSql(const std::string &sql, ResultWriter &writer,
try {
auto result = ExecuteSqlTxn(sql, writer, txn, std::move(check_options));
txn_manager_->Commit(txn);
delete txn;
return result;
} catch (bustub::Exception &ex) {
txn_manager_->Abort(txn);
delete txn;
throw ex;
}
}
Expand Down Expand Up @@ -347,7 +345,6 @@ void BustubInstance::GenerateTestTable() {
l.unlock();

txn_manager_->Commit(txn);
delete txn;
}

/**
Expand All @@ -366,7 +363,6 @@ void BustubInstance::GenerateMockTable() {
l.unlock();

txn_manager_->Commit(txn);
delete txn;
}

BustubInstance::~BustubInstance() {
Expand Down
2 changes: 2 additions & 0 deletions src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ std::chrono::duration<int64_t> log_timeout = std::chrono::seconds(1);

std::chrono::milliseconds cycle_detection_interval = std::chrono::milliseconds(50);

std::atomic<bool> global_disable_execution_exception_print{false};

} // namespace bustub
9 changes: 5 additions & 4 deletions src/concurrency/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
add_library(
bustub_concurrency
OBJECT
lock_manager.cpp
transaction_manager.cpp)
transaction_manager.cpp
transaction_manager_impl.cpp
watermark.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:bustub_concurrency>
PARENT_SCOPE)
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:bustub_concurrency>
PARENT_SCOPE)
43 changes: 31 additions & 12 deletions src/concurrency/transaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,53 @@

#include "concurrency/transaction_manager.h"

#include <memory>
#include <mutex> // NOLINT
#include <optional>
#include <shared_mutex>
#include <unordered_map>
#include <unordered_set>

#include "catalog/catalog.h"
#include "catalog/column.h"
#include "catalog/schema.h"
#include "common/config.h"
#include "common/exception.h"
#include "common/macros.h"
#include "concurrency/transaction.h"
#include "execution/execution_common.h"
#include "storage/table/table_heap.h"
#include "storage/table/tuple.h"
#include "type/type_id.h"
#include "type/value.h"
#include "type/value_factory.h"

namespace bustub {

void TransactionManager::Commit(Transaction *txn) {
// Release all the locks.
ReleaseLocks(txn);
auto TransactionManager::Begin(IsolationLevel isolation_level) -> Transaction * {
std::unique_lock<std::shared_mutex> l(txn_map_mutex_);
auto txn_id = next_txn_id_++;
auto txn = std::make_unique<Transaction>(txn_id, isolation_level);
auto *txn_ref = txn.get();
txn_map_.insert(std::make_pair(txn_id, std::move(txn)));

txn->SetState(TransactionState::COMMITTED);
}

void TransactionManager::Abort(Transaction *txn) {
/* TODO: revert all the changes in write set */
// TODO(fall2023): set the timestamps and compute watermark.

ReleaseLocks(txn);
return txn_ref;
}

txn->SetState(TransactionState::ABORTED);
auto TransactionManager::Commit(Transaction *txn) -> bool {
std::lock_guard<std::mutex> commit_lck(commit_mutex_);
// TODO(fall2023): Implement me!
txn->state_ = TransactionState::COMMITTED;
return true;
}

void TransactionManager::BlockAllTransactions() { UNIMPLEMENTED("block is not supported now!"); }
void TransactionManager::Abort(Transaction *txn) {
// TODO(fall2023): Implement me!
txn->state_ = TransactionState::ABORTED;
}

void TransactionManager::ResumeTransactions() { UNIMPLEMENTED("resume is not supported now!"); }
void TransactionManager::GarbageCollection() { UNIMPLEMENTED("not implemented"); }

} // namespace bustub
112 changes: 112 additions & 0 deletions src/concurrency/transaction_manager_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// DO NOT CHANGE THIS FILE, this file will not be included in the autograder.

#include <exception>
#include <memory>
#include <mutex> // NOLINT
#include <optional>
#include <shared_mutex>
#include <unordered_map>
#include <unordered_set>

#include "catalog/catalog.h"
#include "catalog/column.h"
#include "catalog/schema.h"
#include "common/config.h"
#include "common/exception.h"
#include "common/macros.h"
#include "concurrency/transaction.h"
#include "concurrency/transaction_manager.h"
#include "execution/execution_common.h"
#include "storage/table/table_heap.h"
#include "storage/table/tuple.h"
#include "type/type_id.h"
#include "type/value.h"
#include "type/value_factory.h"

namespace bustub {

auto TransactionManager::UpdateVersionLink(RID rid, std::optional<VersionUndoLink> prev_version,
std::function<bool(std::optional<VersionUndoLink>)> &&check) -> bool {
std::unique_lock<std::shared_mutex> lck(version_info_mutex_);
std::shared_ptr<PageVersionInfo> pg_ver_info = nullptr;
auto iter = version_info_.find(rid.GetPageId());
if (iter == version_info_.end()) {
pg_ver_info = std::make_shared<PageVersionInfo>();
version_info_[rid.GetPageId()] = pg_ver_info;
} else {
pg_ver_info = iter->second;
}
std::unique_lock<std::shared_mutex> lck2(pg_ver_info->mutex_);
lck.unlock();
auto iter2 = pg_ver_info->prev_version_.find(rid.GetSlotNum());
if (iter2 == pg_ver_info->prev_version_.end()) {
if (check != nullptr && !check(std::nullopt)) {
return false;
}
} else {
if (check != nullptr && !check(iter2->second)) {
return false;
}
}
if (prev_version.has_value()) {
pg_ver_info->prev_version_[rid.GetSlotNum()] = *prev_version;
} else {
pg_ver_info->prev_version_.erase(rid.GetSlotNum());
}
return true;
}

auto TransactionManager::GetVersionLink(RID rid) -> std::optional<VersionUndoLink> {
std::shared_lock<std::shared_mutex> lck(version_info_mutex_);
auto iter = version_info_.find(rid.GetPageId());
if (iter == version_info_.end()) {
return std::nullopt;
}
std::shared_ptr<PageVersionInfo> pg_ver_info = iter->second;
std::unique_lock<std::shared_mutex> lck2(pg_ver_info->mutex_);
lck.unlock();
auto iter2 = pg_ver_info->prev_version_.find(rid.GetSlotNum());
if (iter2 == pg_ver_info->prev_version_.end()) {
return std::nullopt;
}
return std::make_optional(iter2->second);
}

auto TransactionManager::GetUndoLink(RID rid) -> std::optional<UndoLink> {
auto version_link = GetVersionLink(rid);
if (version_link.has_value()) {
return version_link->prev_;
}
return std::nullopt;
}

auto TransactionManager::GetUndoLogOptional(UndoLink link) -> std::optional<UndoLog> {
std::shared_lock<std::shared_mutex> lck(txn_map_mutex_);
auto iter = txn_map_.find(link.prev_txn_);
if (iter == txn_map_.end()) {
return std::nullopt;
}
auto txn = iter->second;
lck.unlock();
return txn->GetUndoLog(link.prev_log_idx_);
}

auto TransactionManager::GetUndoLog(UndoLink link) -> UndoLog {
auto undo_log = GetUndoLogOptional(link);
if (undo_log.has_value()) {
return *undo_log;
}
throw Exception("undo log not exist");
}

void Transaction::SetTainted() {
auto state = state_.load();
if (state == TransactionState::RUNNING) {
state_.store(TransactionState::TAINTED);
return;
}
fmt::println(stderr, "transaction not in running state: {}", state);
std::terminate();
}

} // namespace bustub
11 changes: 11 additions & 0 deletions src/concurrency/watermark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#include "concurrency/watermark.h"
#include <exception>
#include "common/exception.h"

namespace bustub {

auto Watermark::AddTxn(timestamp_t read_ts) -> void { throw NotImplementedException("unimplemented"); }

auto Watermark::RemoveTxn(timestamp_t read_ts) -> void { throw NotImplementedException("unimplemented"); }

} // namespace bustub
1 change: 1 addition & 0 deletions src/execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_library(
OBJECT
aggregation_executor.cpp
delete_executor.cpp
execution_common.cpp
executor_factory.cpp
filter_executor.cpp
fmt_impl.cpp
Expand Down
25 changes: 25 additions & 0 deletions src/execution/execution_common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include "execution/execution_common.h"
#include "catalog/catalog.h"
#include "common/config.h"
#include "common/macros.h"
#include "concurrency/transaction_manager.h"
#include "fmt/core.h"
#include "storage/table/table_heap.h"
#include "type/value.h"
#include "type/value_factory.h"

namespace bustub {

auto ReconstructTuple(const Schema *schema, const Tuple &base_tuple, const TupleMeta &base_meta,
const std::vector<UndoLog> &undo_logs) -> std::optional<Tuple> {
UNIMPLEMENTED("not implemented");
}

void TxnMgrDbg(const std::string &info, TransactionManager *txn_mgr, const TableInfo *table_info,
TableHeap *table_heap) {
// always use stderr for printing logs...
fmt::println(stderr, "debug_hook: {}", info);
// noop
}

} // namespace bustub
1 change: 1 addition & 0 deletions src/include/catalog/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Column {
* @param column_name name of the column
* @param type type of column
* @param length length of the varlen
* @param expr expression used to create this column
*/
Column(std::string column_name, TypeId type, uint32_t length)
: column_name_(std::move(column_name)),
Expand Down
14 changes: 14 additions & 0 deletions src/include/common/bustub_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ class SimpleStreamWriter : public ResultWriter {
std::string separator_;
};

class StringVectorWriter : public ResultWriter {
public:
void WriteCell(const std::string &cell) override { values_.back().push_back(cell); }
void WriteHeaderCell(const std::string &cell) override {}
void BeginHeader() override {}
void EndHeader() override {}
void BeginRow() override { values_.emplace_back(); }
void EndRow() override {}
void BeginTable(bool simplified_output) override {}
void EndTable() override {}

std::vector<std::vector<std::string>> values_;
};

class HtmlWriter : public ResultWriter {
auto Escape(const std::string &data) -> std::string {
std::string buffer;
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#include <chrono> // NOLINT
#include <cstdint>

#define DISABLE_LOCK_MANAGER
#define DISABLE_CHECKPOINT_MANAGER

namespace bustub {

/** Cycle detection is performed every CYCLE_DETECTION_INTERVAL milliseconds. */
Expand Down
11 changes: 8 additions & 3 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#pragma once

#include <atomic>
#include <cstdio>
#include <cstdlib>
#include <iostream>
Expand Down Expand Up @@ -51,6 +52,8 @@ enum class ExceptionType {
EXECUTION = 12,
};

extern std::atomic<bool> global_disable_execution_exception_print;

class Exception : public std::runtime_error {
public:
/**
Expand All @@ -75,9 +78,9 @@ class Exception : public std::runtime_error {
Exception(ExceptionType exception_type, const std::string &message, bool print = true)
: std::runtime_error(message), type_(exception_type) {
#ifndef NDEBUG
if (print) {
if (print && !global_disable_execution_exception_print.load()) {
std::string exception_message =
"\nException Type :: " + ExceptionTypeToString(type_) + "\nMessage :: " + message + "\n";
"\nException Type :: " + ExceptionTypeToString(type_) + ", Message :: " + message + "\n\n";
std::cerr << exception_message;
}
#endif
Expand Down Expand Up @@ -109,6 +112,8 @@ class Exception : public std::runtime_error {
return "Out of Memory";
case ExceptionType::NOT_IMPLEMENTED:
return "Not implemented";
case ExceptionType::EXECUTION:
return "Execution";
default:
return "Unknown";
}
Expand All @@ -127,7 +132,7 @@ class NotImplementedException : public Exception {
class ExecutionException : public Exception {
public:
ExecutionException() = delete;
explicit ExecutionException(const std::string &msg) : Exception(ExceptionType::EXECUTION, msg, false) {}
explicit ExecutionException(const std::string &msg) : Exception(ExceptionType::EXECUTION, msg, true) {}
};

} // namespace bustub
6 changes: 4 additions & 2 deletions src/include/concurrency/lock_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ class LockManager {
cycle_detection_thread_ = new std::thread(&LockManager::RunCycleDetection, this);
}

~LockManager() {
#ifndef DISABLE_LOCK_MANAGER
~LockManager() {
UnlockAll();

enable_cycle_detection_ = false;
Expand All @@ -95,8 +95,10 @@ class LockManager {
cycle_detection_thread_->join();
delete cycle_detection_thread_;
}
#endif
}
#else
~LockManager() = default;
#endif

/**
* [LOCK_NOTE]
Expand Down
Loading
Loading