Skip to content

Commit

Permalink
[improve] Use lru cache to count the number of column in tablet schem…
Browse files Browse the repository at this point in the history
…a to control memory
  • Loading branch information
Lchangliang committed Jan 10, 2024
1 parent cc95607 commit 6db4830
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 100 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,7 @@ DEFINE_Bool(group_commit_wait_replay_wal_finish, "false");

DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600");
DEFINE_mInt32(tablet_schema_cache_capacity, "102400");

DEFINE_Bool(exit_on_exception, "false");
// This config controls whether the s3 file writer would flush cache asynchronously
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,8 @@ DECLARE_Bool(group_commit_wait_replay_wal_finish);
DECLARE_Int32(scan_thread_nice_value);
// Used to modify the recycle interval of tablet schema cache
DECLARE_mInt32(tablet_schema_cache_recycle_interval);
// Granularity is at the column level
DECLARE_mInt32(tablet_schema_cache_capacity);

// Use `LOG(FATAL)` to replace `throw` when true
DECLARE_mBool(exit_on_exception);
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count, MetricUnit::OPERATIONS);

BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta) : _tablet_meta(std::move(tablet_meta)) {
TabletSchemaCache::instance()->insert(_tablet_meta->tablet_schema()->to_key());
_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
fmt::format("Tablet.{}", tablet_id()), {{"tablet_id", std::to_string(tablet_id())}},
MetricEntityType::kTablet);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ void BaseBetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset

RowsetSharedPtr BaseBetaRowsetWriter::_build_tmp() {
std::shared_ptr<RowsetMeta> rowset_meta_ = std::make_shared<RowsetMeta>();
*rowset_meta_ = *_rowset_meta;
rowset_meta_->init(_rowset_meta.get());
_build_rowset_meta(rowset_meta_);

RowsetSharedPtr rowset;
Expand Down
38 changes: 31 additions & 7 deletions be/src/olap/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@
#include "io/fs/local_file_system.h"
#include "json2pb/json_to_pb.h"
#include "json2pb/pb_to_json.h"
#include "olap/lru_cache.h"
#include "olap/olap_common.h"
#include "olap/storage_policy.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
#include "olap/tablet_schema_cache.h"

namespace doris {

RowsetMeta::~RowsetMeta() = default;
RowsetMeta::~RowsetMeta() {
if (_handle) {
TabletSchemaCache::instance()->release(_handle);
}
}

bool RowsetMeta::init(const std::string& pb_rowset_meta) {
bool ret = _deserialize_from_pb(pb_rowset_meta);
Expand All @@ -40,16 +46,21 @@ bool RowsetMeta::init(const std::string& pb_rowset_meta) {
return true;
}

bool RowsetMeta::init(const RowsetMeta* rowset_meta) {
RowsetMetaPB rowset_meta_pb;
rowset_meta->to_rowset_pb(&rowset_meta_pb);
return init_from_pb(rowset_meta_pb);
}

bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
if (rowset_meta_pb.has_tablet_schema()) {
_schema = TabletSchemaCache::instance()->insert(
rowset_meta_pb.tablet_schema().SerializeAsString());
set_tablet_schema(rowset_meta_pb.tablet_schema());
}
// Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set it back to `rowset_meta_pb`,
// this won't break const semantics of `rowset_meta_pb`, because `rowset_meta_pb` is not changed
// before and after call this method.
auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
auto schema = mut_rowset_meta_pb.release_tablet_schema();
auto* schema = mut_rowset_meta_pb.release_tablet_schema();
_rowset_meta_pb = mut_rowset_meta_pb;
mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
_init();
Expand Down Expand Up @@ -107,7 +118,21 @@ RowsetMetaPB RowsetMeta::get_rowset_pb() {
}

void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
_schema = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
if (_handle) {
TabletSchemaCache::instance()->release(_handle);
}
auto pair = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
_handle = pair.first;
_schema = pair.second;
}

void RowsetMeta::set_tablet_schema(const TabletSchemaPB& tablet_schema) {
if (_handle) {
TabletSchemaCache::instance()->release(_handle);
}
auto pair = TabletSchemaCache::instance()->insert(tablet_schema.SerializeAsString());
_handle = pair.first;
_schema = pair.second;
}

bool RowsetMeta::_deserialize_from_pb(const std::string& value) {
Expand All @@ -116,8 +141,7 @@ bool RowsetMeta::_deserialize_from_pb(const std::string& value) {
return false;
}
if (rowset_meta_pb.has_tablet_schema()) {
_schema = TabletSchemaCache::instance()->insert(
rowset_meta_pb.tablet_schema().SerializeAsString());
set_tablet_schema(rowset_meta_pb.tablet_schema());
rowset_meta_pb.clear_tablet_schema();
}
_rowset_meta_pb = rowset_meta_pb;
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/tablet_fwd.h"
#include "runtime/memory/lru_cache_policy.h"

namespace doris {

class RowsetMeta {
public:
RowsetMeta() = default;
~RowsetMeta();

bool init(const std::string& pb_rowset_meta);

bool init(const RowsetMeta* rowset_meta);

bool init_from_pb(const RowsetMetaPB& rowset_meta_pb);

bool init_from_json(const std::string& json_rowset_meta);
Expand Down Expand Up @@ -296,9 +300,14 @@ class RowsetMeta {
int64_t newest_write_timestamp() const { return _rowset_meta_pb.newest_write_timestamp(); }

void set_tablet_schema(const TabletSchemaSPtr& tablet_schema);
void set_tablet_schema(const TabletSchemaPB& tablet_schema);

const TabletSchemaSPtr& tablet_schema() { return _schema; }

// Because the member field '_handle' is a raw pointer, use member func 'init' to replace copy ctor
RowsetMeta(const RowsetMeta&) = delete;
RowsetMeta operator=(const RowsetMeta&) = delete;

private:
bool _deserialize_from_pb(const std::string& value);

Expand All @@ -313,6 +322,7 @@ class RowsetMeta {
private:
RowsetMetaPB _rowset_meta_pb;
TabletSchemaSPtr _schema;
Cache::Handle* _handle = nullptr;
RowsetId _rowset_id;
io::FileSystemSPtr _fs;
bool _is_removed_from_rowset_meta = false;
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,8 @@ Result<std::vector<PendingRowsetGuard>> SnapshotManager::convert_rowset_ids(
new_tablet_meta_pb.set_partition_id(partition_id);
}
new_tablet_meta_pb.set_schema_hash(schema_hash);
TabletSchemaSPtr tablet_schema;
tablet_schema =
TabletSchemaCache::instance()->insert(new_tablet_meta_pb.schema().SerializeAsString());
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->init_from_pb(new_tablet_meta_pb.schema());

std::unordered_map<Version, RowsetMetaPB*, HashOfVersion> rs_version_map;
std::unordered_map<RowsetId, RowsetId, HashOfRowsetId> rowset_id_mapping;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,8 @@ Status Tablet::_cooldown_data(RowsetSharedPtr rowset) {
<< ", tp=" << old_rowset->data_disk_size() / duration.count();

// gen a new rowset
auto new_rowset_meta = std::make_shared<RowsetMeta>(*old_rowset->rowset_meta());
auto new_rowset_meta = std::make_shared<RowsetMeta>();
new_rowset_meta->init(old_rowset->rowset_meta().get());
new_rowset_meta->set_rowset_id(new_rowset_id);
new_rowset_meta->set_fs(dest_fs);
new_rowset_meta->set_creation_time(time(nullptr));
Expand Down
81 changes: 29 additions & 52 deletions be/src/olap/tablet_schema_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,71 +17,48 @@

#include "olap/tablet_schema_cache.h"

#include "bvar/bvar.h"
#include <gen_cpp/olap_file.pb.h>

namespace doris {
#include "bvar/bvar.h"
#include "olap/tablet_schema.h"

bvar::Adder<int64_t> g_tablet_schema_cache_count("tablet_schema_cache_count");
bvar::Adder<int64_t> g_tablet_schema_cache_columns_count("tablet_schema_cache_columns_count");

TabletSchemaSPtr TabletSchemaCache::insert(const std::string& key) {
std::lock_guard guard(_mtx);
auto iter = _cache.find(key);
if (iter == _cache.end()) {
TabletSchemaSPtr tablet_schema_ptr = std::make_shared<TabletSchema>();
namespace doris {

std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const std::string& key) {
auto* lru_handle = cache()->lookup(key);
TabletSchemaSPtr tablet_schema_ptr;
if (lru_handle) {
auto* value = (CacheValue*)cache()->value(lru_handle);
value->last_visit_time = UnixMillis();
tablet_schema_ptr = value->tablet_schema;
} else {
auto* value = new CacheValue;
value->last_visit_time = UnixMillis();
tablet_schema_ptr = std::make_shared<TabletSchema>();
TabletSchemaPB pb;
pb.ParseFromString(key);
tablet_schema_ptr->init_from_pb(pb);
_cache[key] = tablet_schema_ptr;
value->tablet_schema = tablet_schema_ptr;
auto deleter = [](const doris::CacheKey& key, void* value) {
auto* cache_value = (CacheValue*)value;
g_tablet_schema_cache_count << -1;
g_tablet_schema_cache_columns_count << -cache_value->tablet_schema->num_columns();
delete cache_value;
};
lru_handle = cache()->insert(key, value, tablet_schema_ptr->num_columns(), deleter,
CachePriority::NORMAL, 0);
g_tablet_schema_cache_count << 1;
g_tablet_schema_cache_columns_count << tablet_schema_ptr->num_columns();
return tablet_schema_ptr;
}
return iter->second;
}

void TabletSchemaCache::start() {
std::thread t(&TabletSchemaCache::_recycle, this);
t.detach();
LOG(INFO) << "TabletSchemaCache started";
}

void TabletSchemaCache::stop() {
_should_stop = true;
while (!_is_stopped) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
LOG(INFO) << "TabletSchemaCache stopped";
DCHECK(lru_handle != nullptr);
return std::make_pair(lru_handle, tablet_schema_ptr);
}

/**
* @brief recycle when TabletSchemaSPtr use_count equals 1.
*/
void TabletSchemaCache::_recycle() {
int64_t check_interval = 5;
int64_t left_second = config::tablet_schema_cache_recycle_interval;
while (!_should_stop) {
if (left_second > 0) {
std::this_thread::sleep_for(std::chrono::seconds(check_interval));
left_second -= check_interval;
continue;
} else {
left_second = config::tablet_schema_cache_recycle_interval;
}

std::lock_guard guard(_mtx);
LOG(INFO) << "Tablet Schema Cache Capacity " << _cache.size();
for (auto iter = _cache.begin(), last = _cache.end(); iter != last;) {
if (iter->second.unique()) {
g_tablet_schema_cache_count << -1;
g_tablet_schema_cache_columns_count << -iter->second->num_columns();
iter = _cache.erase(iter);
} else {
++iter;
}
}
}
_is_stopped = true;
void TabletSchemaCache::release(Cache::Handle* lru_handle) {
cache()->release(lru_handle);
}

} // namespace doris
39 changes: 13 additions & 26 deletions be/src/olap/tablet_schema_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,35 @@

#pragma once

#include <gen_cpp/olap_file.pb.h>

#include <memory>
#include <mutex>
#include <unordered_map>

#include "olap/tablet_schema.h"
#include "olap/tablet_fwd.h"
#include "runtime/exec_env.h"
#include "util/doris_metrics.h"
#include "runtime/memory/lru_cache_policy.h"

namespace doris {

class TabletSchemaCache {
class TabletSchemaCache : public LRUCachePolicy {
public:
~TabletSchemaCache() = default;
TabletSchemaCache(size_t capacity)
: LRUCachePolicy(CachePolicy::CacheType::TABLET_SCHEMA_CACHE, capacity,
LRUCacheType::NUMBER, config::tablet_schema_cache_recycle_interval) {}

static TabletSchemaCache* create_global_schema_cache() {
TabletSchemaCache* res = new TabletSchemaCache();
static TabletSchemaCache* create_global_schema_cache(size_t capacity) {
auto* res = new TabletSchemaCache(capacity);
return res;
}

static TabletSchemaCache* instance() {
return ExecEnv::GetInstance()->get_tablet_schema_cache();
}

TabletSchemaSPtr insert(const std::string& key);

void start();
std::pair<Cache::Handle*, TabletSchemaSPtr> insert(const std::string& key);

void stop();

private:
/**
* @brief recycle when TabletSchemaSPtr use_count equals 1.
*/
void _recycle();
void release(Cache::Handle*);

private:
std::mutex _mtx;
std::unordered_map<std::string, TabletSchemaSPtr> _cache;
std::atomic_bool _should_stop = {false};
std::atomic_bool _is_stopped = {false};
struct CacheValue : public LRUCacheValueBase {
TabletSchemaSPtr tablet_schema;
};
};

} // namespace doris
5 changes: 2 additions & 3 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_heartbeat_flags = new HeartbeatFlags();
_register_metrics();

_tablet_schema_cache = TabletSchemaCache::create_global_schema_cache();
_tablet_schema_cache->start();
_tablet_schema_cache =
TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity);

// S3 buffer pool
_s3_buffer_pool = new io::S3FileBufferPool();
Expand Down Expand Up @@ -522,7 +522,6 @@ void ExecEnv::destroy() {

SAFE_STOP(_wal_manager);
_wal_manager.reset();
SAFE_STOP(_tablet_schema_cache);
SAFE_STOP(_load_channel_mgr);
SAFE_STOP(_scanner_scheduler);
SAFE_STOP(_broker_mgr);
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/memory/cache_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class CachePolicy {
TABLET_VERSION_CACHE = 10,
LAST_SUCCESS_CHANNEL_CACHE = 11,
COMMON_OBJ_LRU_CACHE = 12,
FOR_UT = 13
FOR_UT = 13,
TABLET_SCHEMA_CACHE = 14,
};

static std::string type_string(CacheType type) {
Expand Down Expand Up @@ -73,6 +74,8 @@ class CachePolicy {
return "CommonObjLRUCache";
case CacheType::FOR_UT:
return "ForUT";
case CacheType::TABLET_SCHEMA_CACHE:
return "TabletSchemaCache";
default:
LOG(FATAL) << "not match type of cache policy :" << static_cast<int>(type);
}
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/rowset/rowset_meta_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class RowsetMetaTest : public testing::Test {
std::string _json_rowset_meta;
};

void do_check(RowsetMeta rowset_meta) {
void do_check(const RowsetMeta& rowset_meta) {
RowsetId rowset_id;
rowset_id.init(540081);
EXPECT_EQ(rowset_id, rowset_meta.rowset_id());
Expand Down
Loading

0 comments on commit 6db4830

Please sign in to comment.