From 322f54ee40f83cc5349ac8172231ede00b086292 Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Wed, 27 Jan 2021 16:42:02 +0800 Subject: [PATCH 1/2] feat: remove the dependency of tls memory for redis_parser (#686) --- src/redis_protocol/proxy_lib/redis_parser.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/redis_protocol/proxy_lib/redis_parser.cpp b/src/redis_protocol/proxy_lib/redis_parser.cpp index f08276bba5..65c05ac2eb 100644 --- a/src/redis_protocol/proxy_lib/redis_parser.cpp +++ b/src/redis_protocol/proxy_lib/redis_parser.cpp @@ -299,12 +299,9 @@ bool redis_parser::parse_stream() // string content + CR + LF if (_total_length >= _current_str.length + 2) { if (_current_str.length > 0) { - char *ptr = - reinterpret_cast(dsn::tls_trans_malloc(_current_str.length)); - std::shared_ptr str_data(ptr, - [](char *ptr) { dsn::tls_trans_free(ptr); }); - eat_all(str_data.get(), _current_str.length); - _current_str.data.assign(std::move(str_data), 0, _current_str.length); + std::string str_data(_current_str.length, '\0'); + eat_all(const_cast(str_data.data()), _current_str.length); + _current_str.data = dsn::blob::create_from_bytes(std::move(str_data)); } dverify(eat(CR)); dverify(eat(LF)); From a096e56db535848c7fb059eaca0de47aa698dc03 Mon Sep 17 00:00:00 2001 From: HeYuchen <377710264@qq.com> Date: Mon, 1 Feb 2021 12:58:19 +0800 Subject: [PATCH 2/2] fix(split): add flush_memtable for copy_checkpoint_to_dir (#684) --- rdsn | 2 +- src/server/pegasus_server_impl.cpp | 11 +++++++---- src/server/pegasus_server_impl.h | 6 ++++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/rdsn b/rdsn index ab9520df9b..7404376b45 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit ab9520df9bb47be2eb7d5720098ff463cd83ec36 +Subproject commit 7404376b45cde468df2435cab3f6913fe7e2dd40 diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index fb7b6002b3..38b15d7f55 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1802,19 +1802,21 @@ ::dsn::error_code pegasus_server_impl::async_checkpoint(bool flush_memtable) // Must be thread safe. ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir(const char *checkpoint_dir, - /*output*/ int64_t *last_decree) + /*output*/ int64_t *last_decree, + bool flush_memtable) { CheckpointingTokenHelper token_helper(_is_checkpointing); if (!token_helper.token_got()) { return ::dsn::ERR_WRONG_TIMING; } - return copy_checkpoint_to_dir_unsafe(checkpoint_dir, last_decree); + return copy_checkpoint_to_dir_unsafe(checkpoint_dir, last_decree, flush_memtable); } // not thread safe, should be protected by caller ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir, - int64_t *checkpoint_decree) + int64_t *checkpoint_decree, + bool flush_memtable) { rocksdb::Checkpoint *chkpt_raw = nullptr; auto status = rocksdb::Checkpoint::Create(_db, &chkpt_raw); @@ -1834,7 +1836,8 @@ ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char } // CreateCheckpoint() will not flush memtable when log_size_for_flush = max - status = chkpt->CreateCheckpoint(checkpoint_dir, std::numeric_limits::max()); + status = chkpt->CreateCheckpoint(checkpoint_dir, + flush_memtable ? 0 : std::numeric_limits::max()); if (!status.ok()) { derror_replica("CreateCheckpoint failed, error = {}", status.ToString()); if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) { diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 1703dd7c0b..5785624a5c 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -124,13 +124,15 @@ class pegasus_server_impl : public pegasus_read_service // must be thread safe // this method will not trigger flush(), just copy even if the app is empty. ::dsn::error_code copy_checkpoint_to_dir(const char *checkpoint_dir, - /*output*/ int64_t *last_decree) override; + /*output*/ int64_t *last_decree, + bool flush_memtable = false) override; // // help function, just copy checkpoint to specified dir and ignore _is_checkpointing. // if checkpoint_dir already exist, this function will delete it first. ::dsn::error_code copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir, - /**output*/ int64_t *checkpoint_decree); + /**output*/ int64_t *checkpoint_decree, + bool flush_memtable = false); // get the last checkpoint // if succeed: