Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix log block not record disks requested from the DirManager (backport #53925) #53943

Merged
merged 1 commit into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions be/src/exec/spill/file_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ namespace starrocks::spill {
class FileBlockContainer {
public:
FileBlockContainer(DirPtr dir, const TUniqueId& query_id, const TUniqueId& fragment_instance_id,
int32_t plan_node_id, std::string plan_node_name, uint64_t id)
int32_t plan_node_id, std::string plan_node_name, uint64_t id, size_t acquired_size)
: _dir(std::move(dir)),
_query_id(query_id),
_fragment_instance_id(fragment_instance_id),
_plan_node_id(plan_node_id),
_plan_node_name(std::move(plan_node_name)),
_id(id) {}
_id(id),
_acquired_data_size(acquired_size) {}

~FileBlockContainer() {
// @TODO we need add a gc thread to delete file
Expand Down Expand Up @@ -81,7 +82,7 @@ class FileBlockContainer {

static StatusOr<FileBlockContainerPtr> create(const DirPtr& dir, const TUniqueId& query_id,
const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, uint64_t id);
const std::string& plan_node_name, uint64_t id, size_t block_size);

private:
DirPtr _dir;
Expand Down Expand Up @@ -134,9 +135,10 @@ StatusOr<std::unique_ptr<io::InputStreamWrapper>> FileBlockContainer::get_readab

StatusOr<FileBlockContainerPtr> FileBlockContainer::create(const DirPtr& dir, const TUniqueId& query_id,
const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, uint64_t id) {
auto container =
std::make_shared<FileBlockContainer>(dir, query_id, fragment_instance_id, plan_node_id, plan_node_name, id);
const std::string& plan_node_name, uint64_t id,
size_t block_size) {
auto container = std::make_shared<FileBlockContainer>(dir, query_id, fragment_instance_id, plan_node_id,
plan_node_name, id, block_size);
RETURN_IF_ERROR(container->open());
return container;
}
Expand Down Expand Up @@ -205,8 +207,8 @@ StatusOr<BlockPtr> FileBlockManager::acquire_block(const AcquireBlockOptions& op
AcquireDirOptions acquire_dir_opts;
acquire_dir_opts.data_size = opts.block_size;
ASSIGN_OR_RETURN(auto dir, _dir_mgr->acquire_writable_dir(acquire_dir_opts));
ASSIGN_OR_RETURN(auto block_container,
get_or_create_container(dir, opts.fragment_instance_id, opts.plan_node_id, opts.name));
ASSIGN_OR_RETURN(auto block_container, get_or_create_container(dir, opts.fragment_instance_id, opts.plan_node_id,
opts.name, opts.block_size));
auto res = std::make_shared<FileBlock>(block_container);
res->set_is_remote(dir->is_remote());
return res;
Expand All @@ -223,14 +225,15 @@ Status FileBlockManager::release_block(BlockPtr block) {
StatusOr<FileBlockContainerPtr> FileBlockManager::get_or_create_container(const DirPtr& dir,
const TUniqueId& fragment_instance_id,
int32_t plan_node_id,
const std::string& plan_node_name) {
const std::string& plan_node_name,
size_t block_size) {
TRACE_SPILL_LOG << "get_or_create_container at dir: " << dir->dir() << ", plan node:" << plan_node_id << ", "
<< plan_node_name;
uint64_t id = _next_container_id++;
std::string container_dir = dir->dir() + "/" + print_id(_query_id);
RETURN_IF_ERROR(dir->fs()->create_dir_if_missing(container_dir));
ASSIGN_OR_RETURN(auto block_container, FileBlockContainer::create(dir, _query_id, fragment_instance_id,
plan_node_id, plan_node_name, id));
plan_node_id, plan_node_name, id, block_size));
RETURN_IF_ERROR(block_container->open());
return block_container;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/spill/file_block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class FileBlockManager : public BlockManager {

private:
StatusOr<FileBlockContainerPtr> get_or_create_container(const DirPtr& dir, const TUniqueId& fragment_instance_id,
int32_t plan_node_id, const std::string& plan_node_name);
int32_t plan_node_id, const std::string& plan_node_name,
size_t block_size);

TUniqueId _query_id;
std::atomic<uint64_t> _next_container_id = 0;
Expand Down
24 changes: 14 additions & 10 deletions be/src/exec/spill/log_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ namespace starrocks::spill {
class LogBlockContainer {
public:
LogBlockContainer(DirPtr dir, const TUniqueId& query_id, const TUniqueId& fragment_instance_id,
int32_t plan_node_id, std::string plan_node_name, uint64_t id, bool direct_io)
int32_t plan_node_id, std::string plan_node_name, uint64_t id, bool direct_io,
size_t acquired_size)
: _dir(std::move(dir)),
_query_id(query_id),
_fragment_instance_id(fragment_instance_id),
_plan_node_id(plan_node_id),
_plan_node_name(std::move(plan_node_name)),
_id(id),
_direct_io(direct_io) {}
_direct_io(direct_io),
_acquired_data_size(acquired_size) {}

~LogBlockContainer() {
TRACE_SPILL_LOG << "delete spill container file: " << path();
Expand Down Expand Up @@ -96,7 +98,8 @@ class LogBlockContainer {

static StatusOr<LogBlockContainerPtr> create(const DirPtr& dir, const TUniqueId& query_id,
const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, uint64_t id, bool enable_direct_io);
const std::string& plan_node_name, uint64_t id, bool enable_direct_io,
size_t block_size);

private:
DirPtr _dir;
Expand Down Expand Up @@ -159,10 +162,10 @@ StatusOr<std::unique_ptr<io::InputStreamWrapper>> LogBlockContainer::get_readabl

StatusOr<LogBlockContainerPtr> LogBlockContainer::create(const DirPtr& dir, const TUniqueId& query_id,
const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, uint64_t id,
bool direct_io) {
const std::string& plan_node_name, uint64_t id, bool direct_io,
size_t block_size) {
auto container = std::make_shared<LogBlockContainer>(dir, query_id, fragment_instance_id, plan_node_id,
plan_node_name, id, direct_io);
plan_node_name, id, direct_io, block_size);
RETURN_IF_ERROR(container->open());
return container;
}
Expand Down Expand Up @@ -255,8 +258,9 @@ StatusOr<BlockPtr> LogBlockManager::acquire_block(const AcquireBlockOptions& opt
ASSIGN_OR_RETURN(auto dir, ExecEnv::GetInstance()->spill_dir_mgr()->acquire_writable_dir(acquire_dir_opts));
#endif

ASSIGN_OR_RETURN(auto block_container, get_or_create_container(dir, opts.fragment_instance_id, opts.plan_node_id,
opts.name, opts.direct_io, opts.affinity_group));
ASSIGN_OR_RETURN(auto block_container,
get_or_create_container(dir, opts.fragment_instance_id, opts.plan_node_id, opts.name,
opts.direct_io, opts.affinity_group, opts.block_size));
auto res = std::make_shared<LogBlock>(block_container, block_container->size());
res->set_is_remote(dir->is_remote());
res->set_affinity_group(opts.affinity_group);
Expand Down Expand Up @@ -297,7 +301,7 @@ Status LogBlockManager::release_affinity_group(const BlockAffinityGroup affinity

StatusOr<LogBlockContainerPtr> LogBlockManager::get_or_create_container(
const DirPtr& dir, const TUniqueId& fragment_instance_id, int32_t plan_node_id,
const std::string& plan_node_name, bool direct_io, BlockAffinityGroup affinity_group) {
const std::string& plan_node_name, bool direct_io, BlockAffinityGroup affinity_group, size_t block_size) {
TRACE_SPILL_LOG << "get_or_create_container at dir: " << dir->dir()
<< ". fragment instance: " << print_id(fragment_instance_id) << ", plan node:" << plan_node_id
<< ", " << plan_node_name;
Expand All @@ -319,7 +323,7 @@ StatusOr<LogBlockContainerPtr> LogBlockManager::get_or_create_container(
std::string container_dir = dir->dir() + "/" + print_id(_query_id);
RETURN_IF_ERROR(dir->fs()->create_dir_if_missing(container_dir));
ASSIGN_OR_RETURN(auto block_container, LogBlockContainer::create(dir, _query_id, fragment_instance_id, plan_node_id,
plan_node_name, id, direct_io));
plan_node_name, id, direct_io, block_size));
RETURN_IF_ERROR(block_container->open());
return block_container;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/spill/log_block_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class LogBlockManager : public BlockManager {
private:
StatusOr<LogBlockContainerPtr> get_or_create_container(const DirPtr& dir, const TUniqueId& fragment_instance_id,
int32_t plan_node_id, const std::string& plan_node_name,
bool direct_io, BlockAffinityGroup affinity_group);
bool direct_io, size_t block_size,
BlockAffinityGroup affinity_group);

private:
typedef phmap::flat_hash_map<uint64_t, LogBlockContainerPtr> ContainerMap;
Expand Down
17 changes: 17 additions & 0 deletions be/test/io/spill_block_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,21 @@ TEST_F(SpillBlockManagerTest, hybird_block_allocation_test) {
ASSERT_EQ(block->debug_string(), expected);
}
}

TEST_F(SpillBlockManagerTest, dir_allocate_test) {
auto log_block_mgr = std::make_shared<spill::LogBlockManager>(dummy_query_id, local_dir_mgr.get());
ASSERT_OK(log_block_mgr->open());
{
// 1. allocate the first block but not release it
spill::AcquireBlockOptions opts{.query_id = dummy_query_id,
.fragment_instance_id = dummy_query_id,
.plan_node_id = 1,
.name = "node1",
.block_size = 10};
auto res = log_block_mgr->acquire_block(opts);
ASSERT_TRUE(res.ok());
}
ASSERT_EQ(local_dir_mgr->_dirs[0]->get_current_size(), 0);
}

} // namespace starrocks::vectorized
Loading