diff --git a/src/include/buffer/lru_k_replacer.h b/src/include/buffer/lru_k_replacer.h index b2ec65e72..d24eea386 100644 --- a/src/include/buffer/lru_k_replacer.h +++ b/src/include/buffer/lru_k_replacer.h @@ -23,7 +23,7 @@ namespace bustub { -enum class AccessType { Unknown = 0, Get, Scan }; +enum class AccessType { Unknown = 0, Lookup, Scan, Index }; class LRUKNode { private: diff --git a/src/include/storage/disk/disk_manager_memory.h b/src/include/storage/disk/disk_manager_memory.h index a300f3825..615b1c6d2 100644 --- a/src/include/storage/disk/disk_manager_memory.h +++ b/src/include/storage/disk/disk_manager_memory.h @@ -10,6 +10,7 @@ // //===----------------------------------------------------------------------===// #include +#include // NOLINT #include #include #include // NOLINT @@ -25,6 +26,7 @@ #include "common/config.h" #include "common/exception.h" #include "common/logger.h" +#include "fmt/core.h" #include "storage/disk/disk_manager.h" namespace bustub { @@ -63,7 +65,7 @@ class DiskManagerMemory : public DiskManager { */ class DiskManagerUnlimitedMemory : public DiskManager { public: - DiskManagerUnlimitedMemory() = default; + DiskManagerUnlimitedMemory() { std::fill(recent_access_.begin(), recent_access_.end(), -1); } /** * Write a page to the database file. @@ -71,9 +73,7 @@ class DiskManagerUnlimitedMemory : public DiskManager { * @param page_data raw page data */ void WritePage(page_id_t page_id, const char *page_data) override { - if (latency_ > 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(latency_)); - } + ProcessLatency(page_id); std::unique_lock l(mutex_); if (page_id >= static_cast(data_.size())) { @@ -87,6 +87,8 @@ class DiskManagerUnlimitedMemory : public DiskManager { l.unlock(); memcpy(ptr->first.data(), page_data, BUSTUB_PAGE_SIZE); + + PostProcessLatency(page_id); } /** @@ -95,17 +97,17 @@ class DiskManagerUnlimitedMemory : public DiskManager { * @param[out] page_data output buffer */ void ReadPage(page_id_t page_id, char *page_data) override { - if (latency_ > 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(latency_)); - } + ProcessLatency(page_id); std::unique_lock l(mutex_); if (page_id >= static_cast(data_.size()) || page_id < 0) { - LOG_WARN("page not exist"); + fmt::println(stderr, "page {} not in range", page_id); + std::terminate(); return; } if (data_[page_id] == nullptr) { - LOG_WARN("page not exist"); + fmt::println(stderr, "page {} not exist", page_id); + std::terminate(); return; } std::shared_ptr ptr = data_[page_id]; @@ -113,16 +115,51 @@ class DiskManagerUnlimitedMemory : public DiskManager { l.unlock(); memcpy(page_data, ptr->first.data(), BUSTUB_PAGE_SIZE); + + PostProcessLatency(page_id); } - void SetLatency(size_t latency_ms) { latency_ = latency_ms; } + void ProcessLatency(page_id_t page_id) { + uint64_t sleep_micro_sec = 1000; // for random access, 1ms latency + if (latency_simulator_enabled_) { + std::unique_lock lck(latency_processor_mutex_); + for (auto &recent_page_id : recent_access_) { + if ((recent_page_id & (~0x3)) == (page_id & (~0x3))) { + sleep_micro_sec = 100; // for access in the same "block", 0.1ms latency + break; + } + if (page_id >= recent_page_id && page_id <= recent_page_id + 3) { + sleep_micro_sec = 100; // for sequential access, 0.1ms latency + break; + } + } + lck.unlock(); + std::this_thread::sleep_for(std::chrono::microseconds(sleep_micro_sec)); + } + } + + void PostProcessLatency(page_id_t page_id) { + if (latency_simulator_enabled_) { + std::scoped_lock lck(latency_processor_mutex_); + recent_access_[access_ptr_] = page_id; + access_ptr_ = (access_ptr_ + 1) % recent_access_.size(); + } + } + + void EnableLatencySimulator(bool enabled) { latency_simulator_enabled_ = enabled; } private: - std::mutex mutex_; + bool latency_simulator_enabled_{false}; + + std::mutex latency_processor_mutex_; + std::array recent_access_; + uint64_t access_ptr_{0}; + using Page = std::array; using ProtectedPage = std::pair; + + std::mutex mutex_; std::vector> data_; - size_t latency_{0}; }; } // namespace bustub diff --git a/tools/bpm_bench/CMakeLists.txt b/tools/bpm_bench/CMakeLists.txt index b20aa1261..4d7b07815 100644 --- a/tools/bpm_bench/CMakeLists.txt +++ b/tools/bpm_bench/CMakeLists.txt @@ -1,5 +1,6 @@ set(BPM_BENCH_SOURCES bpm_bench.cpp) -add_executable(bpm-bench ${BPM_BENCH_SOURCES}) +add_executable(bpm-bench ${BPM_BENCH_SOURCES} "${PROJECT_SOURCE_DIR}/tools/backtrace.cpp") +add_backward(bpm-bench) target_link_libraries(bpm-bench bustub) set_target_properties(bpm-bench PROPERTIES OUTPUT_NAME bustub-bpm-bench) diff --git a/tools/bpm_bench/bpm_bench.cpp b/tools/bpm_bench/bpm_bench.cpp index c7b683692..90ae80047 100644 --- a/tools/bpm_bench/bpm_bench.cpp +++ b/tools/bpm_bench/bpm_bench.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include // NOLINT @@ -6,6 +7,7 @@ #include #include #include +#include #include #include @@ -29,12 +31,6 @@ auto ClockMs() -> uint64_t { return static_cast(tm.tv_sec * 1000) + static_cast(tm.tv_usec / 1000); } -static const size_t BUSTUB_SCAN_THREAD = 8; -static const size_t BUSTUB_GET_THREAD = 8; -static const size_t LRU_K_SIZE = 16; -static const size_t BUSTUB_PAGE_CNT = 6400; -static const size_t BUSTUB_BPM_SIZE = 64; - struct BpmTotalMetrics { uint64_t scan_cnt_{0}; uint64_t get_cnt_{0}; @@ -100,6 +96,45 @@ struct BpmMetrics { } }; +struct BustubBenchPageHeader { + uint64_t seed_; + uint64_t page_id_; + char data_[0]; +}; + +/// Modify the page and save some data inside +auto ModifyPage(char *data, size_t page_idx, uint64_t seed) -> void { + auto *pg = reinterpret_cast(data); + pg->seed_ = seed; + pg->page_id_ = page_idx; + pg->data_[pg->seed_ % 4000] = pg->seed_ % 256; +} + +/// Check the page and verify the data inside +auto CheckPageConsistentNoSeed(const char *data, size_t page_idx) -> void { + const auto *pg = reinterpret_cast(data); + if (pg->page_id_ != page_idx) { + fmt::println(stderr, "page header not consistent: page_id_={} page_idx={}", pg->page_id_, page_idx); + std::terminate(); + } + auto left = static_cast(static_cast(pg->data_[pg->seed_ % 4000])); + auto right = static_cast(pg->seed_ % 256); + if (left != right) { + fmt::println(stderr, "page content not consistent: data_[{}]={} seed_ % 256={}", pg->seed_ % 4000, left, right); + std::terminate(); + } +} + +/// Check the page and verify the data inside +auto CheckPageConsistent(const char *data, size_t page_idx, uint64_t seed) -> void { + const auto *pg = reinterpret_cast(data); + if (pg->seed_ != seed) { + fmt::println(stderr, "page seed not consistent: seed_={} seed={}", pg->seed_, seed); + std::terminate(); + } + CheckPageConsistentNoSeed(data, page_idx); +} + // NOLINTNEXTLINE auto main(int argc, char **argv) -> int { using bustub::AccessType; @@ -109,7 +144,12 @@ auto main(int argc, char **argv) -> int { argparse::ArgumentParser program("bustub-bpm-bench"); program.add_argument("--duration").help("run bpm bench for n milliseconds"); - program.add_argument("--latency").help("set disk latency to n milliseconds"); + program.add_argument("--latency").help("enable disk latency"); + program.add_argument("--scan-thread-n").help("number of scan threads"); + program.add_argument("--get-thread-n").help("number of lookup threads"); + program.add_argument("--bpm-size").help("buffer pool size"); + program.add_argument("--db-size").help("number of pages"); + program.add_argument("--lru-k-size").help("lru-k size"); try { program.parse_args(argc, argv); @@ -124,33 +164,60 @@ auto main(int argc, char **argv) -> int { duration_ms = std::stoi(program.get("--duration")); } - uint64_t latency_ms = 0; + uint64_t enable_latency = 0; if (program.present("--latency")) { - latency_ms = std::stoi(program.get("--latency")); + enable_latency = std::stoi(program.get("--latency")); + } + + uint64_t scan_thread_n = 8; + if (program.present("--scan-thread-n")) { + scan_thread_n = std::stoi(program.get("--scan-thread-n")); + } + + uint64_t get_thread_n = 8; + if (program.present("--get-thread-n")) { + get_thread_n = std::stoi(program.get("--get-thread-n")); + } + + uint64_t bustub_page_cnt = 6400; + if (program.present("--db-size")) { + bustub_page_cnt = std::stoi(program.get("--db-size")); + } + + uint64_t bustub_bpm_size = 64; + if (program.present("--bpm-size")) { + bustub_bpm_size = std::stoi(program.get("--bpm-size")); + } + + uint64_t lru_k_size = 16; + if (program.present("--lru-k-size")) { + bustub_page_cnt = std::stoi(program.get("--lru-k-size")); } auto disk_manager = std::make_unique(); - auto bpm = std::make_unique(BUSTUB_BPM_SIZE, disk_manager.get(), LRU_K_SIZE); + auto bpm = std::make_unique(bustub_bpm_size, disk_manager.get(), lru_k_size); std::vector page_ids; - fmt::print(stderr, "[info] total_page={}, duration_ms={}, latency_ms={}, lru_k_size={}, bpm_size={}\n", - BUSTUB_PAGE_CNT, duration_ms, latency_ms, LRU_K_SIZE, BUSTUB_BPM_SIZE); + fmt::print(stderr, + "[info] total_page={}, duration_ms={}, latency={}, lru_k_size={}, bpm_size={}, scan_thread_cnt={}, " + "get_thread_cnt={}\n", + bustub_page_cnt, duration_ms, enable_latency, lru_k_size, bustub_bpm_size, scan_thread_n, get_thread_n); - for (size_t i = 0; i < BUSTUB_PAGE_CNT; i++) { + for (size_t i = 0; i < bustub_page_cnt; i++) { page_id_t page_id; auto *page = bpm->NewPage(&page_id); if (page == nullptr) { throw std::runtime_error("new page failed"); } - char &ch = page->GetData()[i % 1024]; - ch = 1; + + ModifyPage(page->GetData(), i, 0); bpm->UnpinPage(page_id, true); page_ids.push_back(page_id); } // enable disk latency after creating all pages - disk_manager->SetLatency(latency_ms); + disk_manager->EnableLatencySimulator(enable_latency != 0); fmt::print(stderr, "[info] benchmark start\n"); @@ -158,13 +225,18 @@ auto main(int argc, char **argv) -> int { total_metrics.Begin(); std::vector threads; + using ModifyRecord = std::unordered_map; + + for (size_t thread_id = 0; thread_id < scan_thread_n; thread_id++) { + threads.emplace_back([bustub_page_cnt, scan_thread_n, thread_id, &page_ids, &bpm, duration_ms, &total_metrics] { + ModifyRecord records; - for (size_t thread_id = 0; thread_id < BUSTUB_SCAN_THREAD; thread_id++) { - threads.emplace_back(std::thread([thread_id, &page_ids, &bpm, duration_ms, &total_metrics] { BpmMetrics metrics(fmt::format("scan {:>2}", thread_id), duration_ms); metrics.Begin(); - size_t page_idx = BUSTUB_PAGE_CNT * thread_id / BUSTUB_SCAN_THREAD; + size_t page_idx_start = bustub_page_cnt * thread_id / scan_thread_n; + size_t page_idx_end = bustub_page_cnt * (thread_id + 1) / scan_thread_n; + size_t page_idx = page_idx_start; while (!metrics.ShouldFinish()) { auto *page = bpm->FetchPage(page_ids[page_idx], AccessType::Scan); @@ -172,54 +244,53 @@ auto main(int argc, char **argv) -> int { continue; } - char &ch = page->GetData()[page_idx % 1024]; page->WLatch(); - ch += 1; - if (ch == 0) { - ch = 1; - } + auto &seed = records[page_idx]; + CheckPageConsistent(page->GetData(), page_idx, seed); + seed = seed + 1; + ModifyPage(page->GetData(), page_idx, seed); page->WUnlatch(); bpm->UnpinPage(page->GetPageId(), true, AccessType::Scan); - page_idx = (page_idx + 1) % BUSTUB_PAGE_CNT; + page_idx += 1; + if (page_idx >= page_idx_end) { + page_idx = page_idx_start; + } metrics.Tick(); metrics.Report(); } total_metrics.ReportScan(metrics.cnt_); - })); + }); } - for (size_t thread_id = 0; thread_id < BUSTUB_GET_THREAD; thread_id++) { - threads.emplace_back(std::thread([thread_id, &page_ids, &bpm, duration_ms, &total_metrics] { + for (size_t thread_id = 0; thread_id < get_thread_n; thread_id++) { + threads.emplace_back([thread_id, &page_ids, &bpm, bustub_page_cnt, duration_ms, &total_metrics] { std::random_device r; std::default_random_engine gen(r()); - zipfian_int_distribution dist(0, BUSTUB_PAGE_CNT - 1, 0.8); + zipfian_int_distribution dist(0, bustub_page_cnt - 1, 0.8); BpmMetrics metrics(fmt::format("get {:>2}", thread_id), duration_ms); metrics.Begin(); while (!metrics.ShouldFinish()) { auto page_idx = dist(gen); - auto *page = bpm->FetchPage(page_ids[page_idx], AccessType::Get); + auto *page = bpm->FetchPage(page_ids[page_idx], AccessType::Lookup); if (page == nullptr) { continue; } page->RLatch(); - char ch = page->GetData()[page_idx % 1024]; + CheckPageConsistentNoSeed(page->GetData(), page_idx); page->RUnlatch(); - if (ch == 0) { - throw std::runtime_error("invalid data"); - } - bpm->UnpinPage(page->GetPageId(), false, AccessType::Get); + bpm->UnpinPage(page->GetPageId(), false, AccessType::Lookup); metrics.Tick(); metrics.Report(); } total_metrics.ReportGet(metrics.cnt_); - })); + }); } for (auto &thread : threads) {