Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accumulated updates for heap profiling #9240

Merged
merged 5 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ void AsynchronousMetrics::update()
set("LogDiskBytes", usage.total_log_disk_size);
set("PagesInMem", usage.num_pages);
set("VersionedEntries", DB::PS::PageStorageMemorySummary::versioned_entry_or_delete_count.load());
set("UniversalWrite", DB::PS::PageStorageMemorySummary::universal_write_count.load());
}

if (context.getSharedContextDisagg()->isDisaggregatedStorageMode())
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ TMTContext::TMTContext(

void TMTContext::initS3GCManager(const TiFlashRaftProxyHelper * proxy_helper)
{
kvstore->fetchProxyConfig(proxy_helper);
if (!raftproxy_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled()
&& !context.getSharedContextDisagg()->isDisaggregatedComputeMode())
{
kvstore->fetchProxyConfig(proxy_helper);
if (kvstore->getProxyConfigSummay().valid)
{
LOG_INFO(
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/PageStorageMemorySummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct PageStorageMemorySummary
static inline std::atomic_int64_t uni_page_id_bytes{0};
static inline std::atomic_int64_t versioned_entry_or_delete_bytes{0};
static inline std::atomic_int64_t versioned_entry_or_delete_count{0};
static inline std::atomic_int64_t universal_write_count{0};
};

} // namespace DB::PS
} // namespace DB::PS
15 changes: 13 additions & 2 deletions dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class UniversalWriteBatch : private boost::noncopyable
: prefix(std::move(prefix_))
{}

~UniversalWriteBatch() { PS::PageStorageMemorySummary::universal_write_count.fetch_sub(writes.size()); }

void putPage(
PageIdU64 page_id,
UInt64 tag,
Expand Down Expand Up @@ -126,6 +128,7 @@ class UniversalWriteBatch : private boost::noncopyable
Write w{WriteBatchWriteType::PUT, page_id, tag, read_buffer, size, "", std::move(offsets)};
total_data_size += size;
writes.emplace_back(std::move(w));
PS::PageStorageMemorySummary::universal_write_count.fetch_add(1);
}

void putPage(
Expand Down Expand Up @@ -156,6 +159,7 @@ class UniversalWriteBatch : private boost::noncopyable
data_location};
writes.emplace_back(std::move(w));
has_writes_from_remote = true;
PS::PageStorageMemorySummary::universal_write_count.fetch_add(1);
}

void updateRemotePage(const UniversalPageId & page_id, const ReadBufferPtr & read_buffer, PageSize size)
Expand All @@ -164,13 +168,15 @@ class UniversalWriteBatch : private boost::noncopyable
total_data_size += size;
writes.emplace_back(std::move(w));
// This is use for update local page data from remote, don't need to set `has_writes_from_remote`
PS::PageStorageMemorySummary::universal_write_count.fetch_add(1);
}

void putExternal(const UniversalPageId & page_id, UInt64 tag)
{
// External page's data is not managed by PageStorage, which means data is empty.
Write w{WriteBatchWriteType::PUT_EXTERNAL, page_id, tag, nullptr, 0, "", {}};
writes.emplace_back(std::move(w));
PS::PageStorageMemorySummary::universal_write_count.fetch_add(1);
}

void putRemoteExternal(const UniversalPageId & page_id, const PS::V3::CheckpointLocation & data_location)
Expand All @@ -179,19 +185,22 @@ class UniversalWriteBatch : private boost::noncopyable
Write w{WriteBatchWriteType::PUT_EXTERNAL, page_id, /*tag*/ 0, nullptr, 0, "", {}, data_location};
writes.emplace_back(std::move(w));
has_writes_from_remote = true;
PS::PageStorageMemorySummary::universal_write_count.fetch_add(1);
}

// Add RefPage{ref_id} -> Page{page_id}
void putRefPage(const UniversalPageId & ref_id, const UniversalPageId & page_id)
{
Write w{WriteBatchWriteType::REF, ref_id, 0, nullptr, 0, page_id, {}};
writes.emplace_back(std::move(w));
PS::PageStorageMemorySummary::universal_write_count.fetch_add(1);
}

void delPage(const UniversalPageId & page_id)
{
Write w{WriteBatchWriteType::DEL, page_id, 0, nullptr, 0, "", {}};
writes.emplace_back(std::move(w));
PS::PageStorageMemorySummary::universal_write_count.fetch_add(1);
}

bool empty() const { return writes.empty(); }
Expand Down Expand Up @@ -276,11 +285,13 @@ class UniversalWriteBatch : private boost::noncopyable

UniversalWriteBatch(UniversalWriteBatch && rhs) noexcept
: prefix(std::move(rhs.prefix))
, writes(std::move(rhs.writes))
, total_data_size(rhs.total_data_size)
, has_writes_from_remote(rhs.has_writes_from_remote)
, remote_lock_disabled(rhs.remote_lock_disabled)
{}
{
PS::PageStorageMemorySummary::universal_write_count.fetch_sub(writes.size());
writes = std::move(rhs.writes);
}

void swap(UniversalWriteBatch & o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,18 +568,58 @@ TEST(UniPageStorageIdTest, UniversalPageIdMemoryTrace)
auto u_id = UniversalPageIdFormat::toFullPageId("aaa", 100);
auto page1_mem = PS::PageStorageMemorySummary::uni_page_id_bytes.load();
auto ps = page1_mem - prim_mem;
// copy construct
auto u_id_cpy = u_id;
ASSERT_EQ(PS::PageStorageMemorySummary::uni_page_id_bytes.load(), prim_mem + ps * 2);
// move assignment
UniversalPageId u_id_mv = UniversalPageIdFormat::toFullPageId("aaa", 100);
ASSERT_EQ(PS::PageStorageMemorySummary::uni_page_id_bytes.load(), prim_mem + ps * 3);
u_id_mv = std::move(u_id_cpy);
ASSERT_EQ(PS::PageStorageMemorySummary::uni_page_id_bytes.load(), prim_mem + ps * 2);
// copy assignment
UniversalPageId u_id_cpy2 = UniversalPageIdFormat::toFullPageId("aaa", 100);
u_id_cpy2 = u_id_mv;
ASSERT_EQ(PS::PageStorageMemorySummary::uni_page_id_bytes.load(), prim_mem + ps * 3);
// move construct
auto u_id_mv2 = std::move(u_id_cpy2);
ASSERT_EQ(PS::PageStorageMemorySummary::uni_page_id_bytes.load(), prim_mem + ps * 3);
}
ASSERT_EQ(PS::PageStorageMemorySummary::uni_page_id_bytes.load(), prim_mem);
}


TEST(UniPageStorageIdTest, UniversalWriteBatchMemory)
{
const String prefix = "aaa";
const UInt64 tag = 0;
static constexpr size_t buf_sz = 1024;
char c_buff[buf_sz] = {};
for (size_t i = 0; i < buf_sz; ++i)
{
c_buff[i] = i % 0xff;
}
{
UniversalWriteBatch wb;
wb.putPage(
UniversalPageIdFormat::toFullPageId(prefix, 0),
tag,
std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz),
buf_sz);
ASSERT_EQ(PageStorageMemorySummary::universal_write_count.load(), 1);
}
ASSERT_EQ(PageStorageMemorySummary::universal_write_count.load(), 0);
{
UniversalWriteBatch wb;
wb.putPage(
UniversalPageIdFormat::toFullPageId(prefix, 0),
tag,
std::make_shared<ReadBufferFromMemory>(c_buff, buf_sz),
buf_sz);
UniversalWriteBatch wb2 = std::move(wb);
ASSERT_EQ(PageStorageMemorySummary::universal_write_count.load(), 1);
}
ASSERT_EQ(PageStorageMemorySummary::universal_write_count.load(), 0);
}

} // namespace PS::universal::tests
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -858,4 +858,5 @@ TEST(LogFileRWTest2, ManuallySync)
ASSERT_EQ(scratch, payload);
}
}

} // namespace DB::PS::V3::tests
6 changes: 4 additions & 2 deletions libs/libcommon/cmake/find_jemalloc.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.

option (ENABLE_JEMALLOC "Set to TRUE to use jemalloc" ON)
# TODO: Make ENABLE_JEMALLOC_PROF default value to ON after https://github.com/pingcap/tics/issues/3236 get fixed.
option (ENABLE_JEMALLOC_PROF "Set to ON to enable jemalloc profiling" OFF)
# 1. The deadlock mentioned in https://github.com/pingcap/tics/issues/3236 is not related to ENABLE_JEMALLOC_PROF.
# 2. It is also expected to be eliminated even if the heap profiling is activated, with a newer version of pprof-rs.
# TODO: Enable continuous heap profiling after we make sure statement 2.
option (ENABLE_JEMALLOC_PROF "Set to ON to enable jemalloc profiling" ON)
option (USE_INTERNAL_JEMALLOC_LIBRARY "Set to FALSE to use system jemalloc library instead of bundled" ${NOT_UNBUNDLED})

if (ENABLE_JEMALLOC AND (CMAKE_BUILD_TYPE_UC STREQUAL "ASAN" OR CMAKE_BUILD_TYPE_UC STREQUAL "UBSAN" OR CMAKE_BUILD_TYPE_UC STREQUAL "TSAN"))
Expand Down