Skip to content

Commit

Permalink
[refactor](execenv) remove shared ptr from exec env
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei committed Dec 26, 2024
1 parent 0cae978 commit abb2fbf
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 18 deletions.
23 changes: 11 additions & 12 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,14 @@ class ExecEnv {
}
LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr.get(); }
std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return _new_load_stream_mgr; }
NewLoadStreamMgr* new_load_stream_mgr() { return _new_load_stream_mgr.get(); }
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
doris::vectorized::SpillStreamManager* spill_stream_mgr() { return _spill_stream_mgr; }
GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; }

const std::vector<StorePath>& store_paths() const { return _store_paths; }

std::shared_ptr<StreamLoadExecutor> stream_load_executor() { return _stream_load_executor; }
StreamLoadExecutor* stream_load_executor() { return _stream_load_executor.get(); }
RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; }
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; }
Expand All @@ -273,10 +273,10 @@ class ExecEnv {
_memtable_memory_limiter.reset(limiter);
}
void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = cluster_info; }
void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr) {
void set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr) {
this->_new_load_stream_mgr = new_load_stream_mgr;
}
void set_stream_load_executor(std::shared_ptr<StreamLoadExecutor> stream_load_executor) {
void set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor) {
this->_stream_load_executor = stream_load_executor;
}

Expand All @@ -294,8 +294,8 @@ class ExecEnv {
void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) {
this->_routine_load_task_executor = r;
}
void set_wal_mgr(std::shared_ptr<WalManager> wm) { this->_wal_manager = wm; }
void set_dummy_lru_cache(std::shared_ptr<DummyLRUCache> dummy_lru_cache) {
void set_wal_mgr(std::unique_ptr<WalManager>&& wm) { this->_wal_manager = wm; }
void set_dummy_lru_cache(std::unique_ptr<DummyLRUCache>&& dummy_lru_cache) {
this->_dummy_lru_cache = dummy_lru_cache;
}
void set_write_cooldown_meta_executors();
Expand Down Expand Up @@ -331,7 +331,7 @@ class ExecEnv {
return _inverted_index_query_cache;
}
QueryCache* get_query_cache() { return _query_cache; }
std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return _dummy_lru_cache; }
DummyLRUCache* get_dummy_lru_cache() { return _dummy_lru_cache.get(); }

pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
return _runtime_filter_timer_queue;
Expand Down Expand Up @@ -429,13 +429,12 @@ class ExecEnv {
BrokerMgr* _broker_mgr = nullptr;
LoadChannelMgr* _load_channel_mgr = nullptr;
std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
// TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle.
std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
std::unique_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
BrpcClientCache<PBackendService_Stub>* _streaming_client_cache = nullptr;
BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;

std::shared_ptr<StreamLoadExecutor> _stream_load_executor;
std::unique_ptr<StreamLoadExecutor> _stream_load_executor;
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
HeartbeatFlags* _heartbeat_flags = nullptr;
Expand All @@ -446,7 +445,7 @@ class ExecEnv {
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::shared_ptr<WalManager> _wal_manager;
std::unique_ptr<WalManager> _wal_manager;
DNSCache* _dns_cache = nullptr;
std::unique_ptr<WriteCooldownMetaExecutors> _write_cooldown_meta_executors;

Expand All @@ -473,7 +472,7 @@ class ExecEnv {
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr;
segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
QueryCache* _query_cache = nullptr;
std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr;
std::unique_ptr<DummyLRUCache> _dummy_lru_cache = nullptr;
std::unique_ptr<io::FDCache> _file_cache_open_fd_cache;

pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
_file_cache_open_fd_cache = std::make_unique<io::FDCache>();
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
_wal_manager = WalManager::create_unique(this, config::group_commit_wal_path);
_dns_cache = new DNSCache();
_write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
_spill_stream_mgr = new vectorized::SpillStreamManager(std::move(spill_store_map));
Expand Down
4 changes: 2 additions & 2 deletions be/test/http/stream_load_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ void http_request_done_cb(struct evhttp_request* req, void* arg) {

TEST_F(StreamLoadTest, TestHeader) {
// 1G
auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(), config::group_commit_wal_path);
auto wal_mgr = WalManager::create_unique(ExecEnv::GetInstance(), config::group_commit_wal_path);
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0));
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0, 0));
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0, 0));
ExecEnv::GetInstance()->set_wal_mgr(wal_mgr);
ExecEnv::GetInstance()->set_wal_mgr(std::move(wal_mgr));
// 1. empty info
{
auto* evhttp_req = evhttp_request_new(nullptr, nullptr);
Expand Down
4 changes: 2 additions & 2 deletions be/test/olap/wal/wal_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ TEST_F(WalManagerTest, recovery_normal) {
}

TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
auto wal_mgr = WalManager::create_shared(_env, config::group_commit_wal_path);
auto wal_mgr = WalManager::create_unique(_env, config::group_commit_wal_path);
static_cast<void>(wal_mgr->init());
_env->set_wal_mgr(wal_mgr);
_env->set_wal_mgr(std::move(wal_mgr));

// 1T
size_t available_bytes = 1099511627776;
Expand Down
2 changes: 1 addition & 1 deletion be/test/testutil/run_all_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance());
doris::ExecEnv::GetInstance()->set_process_profile(
doris::ProcessProfile::create_global_instance());
doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared<doris::DummyLRUCache>());
doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_unique<doris::DummyLRUCache>());
doris::ExecEnv::GetInstance()->set_storage_page_cache(
doris::StoragePageCache::create_global_cache(1 << 30, 10, 0));
doris::ExecEnv::GetInstance()->set_segment_loader(new doris::SegmentLoader(1000, 1000));
Expand Down

0 comments on commit abb2fbf

Please sign in to comment.