Skip to content

Commit

Permalink
[BugFix] Fix log block not record disks requested from the DirManager
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Dec 13, 2024
1 parent 424cd9b commit 12ce117
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 22 deletions.
23 changes: 13 additions & 10 deletions be/src/exec/spill/file_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ namespace starrocks::spill {
class FileBlockContainer {
public:
FileBlockContainer(DirPtr dir, const TUniqueId& query_id, const TUniqueId& fragment_instance_id,
int32_t plan_node_id, std::string plan_node_name, uint64_t id)
int32_t plan_node_id, std::string plan_node_name, uint64_t id, size_t acquired_size)
: _dir(std::move(dir)),
_query_id(query_id),
_fragment_instance_id(fragment_instance_id),
_plan_node_id(plan_node_id),
_plan_node_name(std::move(plan_node_name)),
_id(id) {}
_id(id),
_acquired_data_size(acquired_size) {}

~FileBlockContainer() {
// @TODO we need add a gc thread to delete file
Expand Down Expand Up @@ -81,7 +82,7 @@ class FileBlockContainer {

static StatusOr<FileBlockContainerPtr> create(const DirPtr& dir, const TUniqueId& query_id,
const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, uint64_t id);
const std::string& plan_node_name, uint64_t id, size_t block_size);

private:
DirPtr _dir;
Expand Down Expand Up @@ -134,9 +135,10 @@ StatusOr<std::unique_ptr<io::InputStreamWrapper>> FileBlockContainer::get_readab

StatusOr<FileBlockContainerPtr> FileBlockContainer::create(const DirPtr& dir, const TUniqueId& query_id,
const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, uint64_t id) {
auto container =
std::make_shared<FileBlockContainer>(dir, query_id, fragment_instance_id, plan_node_id, plan_node_name, id);
const std::string& plan_node_name, uint64_t id,
size_t block_size) {
auto container = std::make_shared<FileBlockContainer>(dir, query_id, fragment_instance_id, plan_node_id,
plan_node_name, id, block_size);
RETURN_IF_ERROR(container->open());
return container;
}
Expand Down Expand Up @@ -205,8 +207,8 @@ StatusOr<BlockPtr> FileBlockManager::acquire_block(const AcquireBlockOptions& op
AcquireDirOptions acquire_dir_opts;
acquire_dir_opts.data_size = opts.block_size;
ASSIGN_OR_RETURN(auto dir, _dir_mgr->acquire_writable_dir(acquire_dir_opts));
ASSIGN_OR_RETURN(auto block_container,
get_or_create_container(dir, opts.fragment_instance_id, opts.plan_node_id, opts.name));
ASSIGN_OR_RETURN(auto block_container, get_or_create_container(dir, opts.fragment_instance_id, opts.plan_node_id,
opts.name, opts.block_size));
auto res = std::make_shared<FileBlock>(block_container);
res->set_is_remote(dir->is_remote());
return res;
Expand All @@ -223,14 +225,15 @@ Status FileBlockManager::release_block(BlockPtr block) {
StatusOr<FileBlockContainerPtr> FileBlockManager::get_or_create_container(const DirPtr& dir,
const TUniqueId& fragment_instance_id,
int32_t plan_node_id,
const std::string& plan_node_name) {
const std::string& plan_node_name,
size_t block_size) {
TRACE_SPILL_LOG << "get_or_create_container at dir: " << dir->dir() << ", plan node:" << plan_node_id << ", "
<< plan_node_name;
uint64_t id = _next_container_id++;
std::string container_dir = dir->dir() + "/" + print_id(_query_id);
RETURN_IF_ERROR(dir->fs()->create_dir_if_missing(container_dir));
ASSIGN_OR_RETURN(auto block_container, FileBlockContainer::create(dir, _query_id, fragment_instance_id,
plan_node_id, plan_node_name, id));
plan_node_id, plan_node_name, id, block_size));
RETURN_IF_ERROR(block_container->open());
return block_container;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/spill/file_block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class FileBlockManager : public BlockManager {

private:
StatusOr<FileBlockContainerPtr> get_or_create_container(const DirPtr& dir, const TUniqueId& fragment_instance_id,
int32_t plan_node_id, const std::string& plan_node_name);
int32_t plan_node_id, const std::string& plan_node_name,
size_t block_size);

TUniqueId _query_id;
std::atomic<uint64_t> _next_container_id = 0;
Expand Down
24 changes: 14 additions & 10 deletions be/src/exec/spill/log_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ namespace starrocks::spill {
class LogBlockContainer {
public:
LogBlockContainer(DirPtr dir, const TUniqueId& query_id, const TUniqueId& fragment_instance_id,
int32_t plan_node_id, std::string plan_node_name, uint64_t id, bool direct_io)
int32_t plan_node_id, std::string plan_node_name, uint64_t id, bool direct_io,
size_t acquired_size)
: _dir(std::move(dir)),
_query_id(query_id),
_fragment_instance_id(fragment_instance_id),
_plan_node_id(plan_node_id),
_plan_node_name(std::move(plan_node_name)),
_id(id),
_direct_io(direct_io) {}
_direct_io(direct_io),
_acquired_data_size(acquired_size) {}

~LogBlockContainer() {
TRACE_SPILL_LOG << "delete spill container file: " << path();
Expand Down Expand Up @@ -96,7 +98,8 @@ class LogBlockContainer {

static StatusOr<LogBlockContainerPtr> create(const DirPtr& dir, const TUniqueId& query_id,
const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, uint64_t id, bool enable_direct_io);
const std::string& plan_node_name, uint64_t id, bool enable_direct_io,
size_t block_size);

private:
DirPtr _dir;
Expand Down Expand Up @@ -159,10 +162,10 @@ StatusOr<std::unique_ptr<io::InputStreamWrapper>> LogBlockContainer::get_readabl

StatusOr<LogBlockContainerPtr> LogBlockContainer::create(const DirPtr& dir, const TUniqueId& query_id,
const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, uint64_t id,
bool direct_io) {
const std::string& plan_node_name, uint64_t id, bool direct_io,
size_t block_size) {
auto container = std::make_shared<LogBlockContainer>(dir, query_id, fragment_instance_id, plan_node_id,
plan_node_name, id, direct_io);
plan_node_name, id, direct_io, block_size);
RETURN_IF_ERROR(container->open());
return container;
}
Expand Down Expand Up @@ -255,8 +258,9 @@ StatusOr<BlockPtr> LogBlockManager::acquire_block(const AcquireBlockOptions& opt
ASSIGN_OR_RETURN(auto dir, ExecEnv::GetInstance()->spill_dir_mgr()->acquire_writable_dir(acquire_dir_opts));
#endif

ASSIGN_OR_RETURN(auto block_container, get_or_create_container(dir, opts.fragment_instance_id, opts.plan_node_id,
opts.name, opts.direct_io, opts.affinity_group));
ASSIGN_OR_RETURN(auto block_container,
get_or_create_container(dir, opts.fragment_instance_id, opts.plan_node_id, opts.name,
opts.direct_io, opts.affinity_group, opts.block_size));
auto res = std::make_shared<LogBlock>(block_container, block_container->size());
res->set_is_remote(dir->is_remote());
res->set_affinity_group(opts.affinity_group);
Expand Down Expand Up @@ -297,7 +301,7 @@ Status LogBlockManager::release_affinity_group(const BlockAffinityGroup affinity

StatusOr<LogBlockContainerPtr> LogBlockManager::get_or_create_container(
const DirPtr& dir, const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, bool direct_io, BlockAffinityGroup affinity_group) {
const std::string& plan_node_name, bool direct_io, BlockAffinityGroup affinity_group, size_t block_size) {
TRACE_SPILL_LOG << "get_or_create_container at dir: " << dir->dir()
<< ". fragment instance: " << print_id(fragment_instance_id) << ", plan node:" << plan_node_id
<< ", " << plan_node_name;
Expand All @@ -319,7 +323,7 @@ StatusOr<LogBlockContainerPtr> LogBlockManager::get_or_create_container(
std::string container_dir = dir->dir() + "/" + print_id(_query_id);
RETURN_IF_ERROR(dir->fs()->create_dir_if_missing(container_dir));
ASSIGN_OR_RETURN(auto block_container, LogBlockContainer::create(dir, _query_id, fragment_instance_id, plan_node_id,
plan_node_name, id, direct_io));
plan_node_name, id, direct_io, block_size));
RETURN_IF_ERROR(block_container->open());
return block_container;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/spill/log_block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class LogBlockManager : public BlockManager {
private:
StatusOr<LogBlockContainerPtr> get_or_create_container(const DirPtr& dir, const TUniqueId& fragment_instance_id,
int32_t plan_node_id, const std::string& plan_node_name,
bool direct_io, BlockAffinityGroup affinity_group);
bool direct_io, size_t block_size,
BlockAffinityGroup affinity_group);

private:
typedef phmap::flat_hash_map<uint64_t, LogBlockContainerPtr> ContainerMap;
Expand Down
17 changes: 17 additions & 0 deletions be/test/io/spill_block_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,21 @@ TEST_F(SpillBlockManagerTest, hybird_block_allocation_test) {
ASSERT_EQ(block->debug_string(), expected);
}
}

TEST_F(SpillBlockManagerTest, dir_allocate_test) {
auto log_block_mgr = std::make_shared<spill::LogBlockManager>(dummy_query_id, local_dir_mgr.get());
ASSERT_OK(log_block_mgr->open());
{
// 1. allocate the first block but not release it
spill::AcquireBlockOptions opts{.query_id = dummy_query_id,
.fragment_instance_id = dummy_query_id,
.plan_node_id = 1,
.name = "node1",
.block_size = 10};
auto res = log_block_mgr->acquire_block(opts);
ASSERT_TRUE(res.ok());
}
ASSERT_EQ(local_dir_mgr->_dirs[0]->get_current_size(), 0);
}

} // namespace starrocks::vectorized

0 comments on commit 12ce117

Please sign in to comment.