Skip to content

Commit

Permalink
Merge branch 'master' into pack
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Feb 2, 2021
2 parents e6fdf2d + a096e56 commit c6f5f6b
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 13 deletions.
2 changes: 1 addition & 1 deletion rdsn
Submodule rdsn updated 54 files
+6 −5 include/dsn/cpp/rpc_stream.h
+2 −0 include/dsn/dist/replication/replica_envs.h
+4 −1 include/dsn/dist/replication/replication.codes.h
+2 −1 include/dsn/dist/replication/replication_app_base.h
+11 −0 include/dsn/dist/replication/replication_ddl_client.h
+277 −2 include/dsn/dist/replication/replication_types.h
+2 −5 include/dsn/tool-api/rpc_message.h
+1 −2 include/dsn/tool-api/task.h
+0 −56 include/dsn/utility/callocator.h
+56 −412 include/dsn/utility/join_point.h
+1 −10 include/dsn/utility/transient_memory.h
+22 −8 src/block_service/hdfs/hdfs_service.cpp
+1 −0 src/block_service/hdfs/hdfs_service.h
+1 −1 src/client/partition_resolver_simple.h
+35 −0 src/client/replication_ddl_client.cpp
+2 −0 src/common/replication_common.cpp
+2 −0 src/common/replication_common.h
+1,217 −624 src/common/replication_types.cpp
+8 −4 src/meta/app_env_validator.cpp
+38 −0 src/meta/meta_service.cpp
+2 −0 src/meta/meta_service.h
+218 −0 src/meta/meta_split_service.cpp
+26 −0 src/meta/meta_split_service.h
+351 −0 src/meta/test/meta_split_service_test.cpp
+13 −1 src/replica/replica.cpp
+22 −18 src/replica/replica.h
+2 −7 src/replica/replica_2pc.cpp
+1 −0 src/replica/replica_check.cpp
+11 −7 src/replica/replica_context.cpp
+6 −0 src/replica/replica_context.h
+45 −32 src/replica/replica_throttle.cpp
+117 −6 src/replica/split/replica_split_manager.cpp
+9 −0 src/replica/split/replica_split_manager.h
+123 −13 src/replica/split/test/replica_split_test.cpp
+2 −1 src/replica/storage/simple_kv/simple_kv.server.impl.h
+2 −2 src/replica/storage/simple_kv/test/checker.cpp
+2 −1 src/replica/storage/simple_kv/test/simple_kv.server.impl.h
+2 −1 src/replica/test/mock_utils.h
+1 −1 src/replica/test/replica_test_base.h
+52 −0 src/replication.thrift
+0 −2 src/runtime/profiler.cpp
+17 −17 src/runtime/profiler_command.cpp
+14 −18 src/runtime/profiler_header.h
+8 −12 src/runtime/rpc/rpc_message.cpp
+3 −0 src/runtime/rpc/thrift_message_parser.cpp
+4 −3 src/runtime/service_api_c.cpp
+27 −0 src/runtime/test/rpc_message.cpp
+4 −4 src/runtime/tool_api.cpp
+0 −163 src/utils/join_point.cpp
+0 −160 src/utils/test/join_point.cpp
+87 −0 src/utils/test/join_point_test.cpp
+5 −0 src/utils/throttling_controller.cpp
+0 −3 src/utils/throttling_controller.h
+1 −43 src/utils/transient_memory.cpp
9 changes: 3 additions & 6 deletions src/redis_protocol/proxy_lib/redis_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,9 @@ bool redis_parser::parse_stream()
// string content + CR + LF
if (_total_length >= _current_str.length + 2) {
if (_current_str.length > 0) {
char *ptr =
reinterpret_cast<char *>(dsn::tls_trans_malloc(_current_str.length));
std::shared_ptr<char> str_data(ptr,
[](char *ptr) { dsn::tls_trans_free(ptr); });
eat_all(str_data.get(), _current_str.length);
_current_str.data.assign(std::move(str_data), 0, _current_str.length);
std::string str_data(_current_str.length, '\0');
eat_all(const_cast<char *>(str_data.data()), _current_str.length);
_current_str.data = dsn::blob::create_from_bytes(std::move(str_data));
}
dverify(eat(CR));
dverify(eat(LF));
Expand Down
11 changes: 7 additions & 4 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1802,19 +1802,21 @@ ::dsn::error_code pegasus_server_impl::async_checkpoint(bool flush_memtable)

// Must be thread safe.
::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree)
/*output*/ int64_t *last_decree,
bool flush_memtable)
{
CheckpointingTokenHelper token_helper(_is_checkpointing);
if (!token_helper.token_got()) {
return ::dsn::ERR_WRONG_TIMING;
}

return copy_checkpoint_to_dir_unsafe(checkpoint_dir, last_decree);
return copy_checkpoint_to_dir_unsafe(checkpoint_dir, last_decree, flush_memtable);
}

// not thread safe, should be protected by caller
::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir,
int64_t *checkpoint_decree)
int64_t *checkpoint_decree,
bool flush_memtable)
{
rocksdb::Checkpoint *chkpt_raw = nullptr;
auto status = rocksdb::Checkpoint::Create(_db, &chkpt_raw);
Expand All @@ -1834,7 +1836,8 @@ ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char
}

// CreateCheckpoint() will not flush memtable when log_size_for_flush = max
status = chkpt->CreateCheckpoint(checkpoint_dir, std::numeric_limits<uint64_t>::max());
status = chkpt->CreateCheckpoint(checkpoint_dir,
flush_memtable ? 0 : std::numeric_limits<uint64_t>::max());
if (!status.ok()) {
derror_replica("CreateCheckpoint failed, error = {}", status.ToString());
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
Expand Down
6 changes: 4 additions & 2 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,15 @@ class pegasus_server_impl : public pegasus_read_service
// must be thread safe
// this method will not trigger flush(), just copy even if the app is empty.
::dsn::error_code copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree) override;
/*output*/ int64_t *last_decree,
bool flush_memtable = false) override;

//
// help function, just copy checkpoint to specified dir and ignore _is_checkpointing.
// if checkpoint_dir already exist, this function will delete it first.
::dsn::error_code copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir,
/**output*/ int64_t *checkpoint_decree);
/**output*/ int64_t *checkpoint_decree,
bool flush_memtable = false);

// get the last checkpoint
// if succeed:
Expand Down

0 comments on commit c6f5f6b

Please sign in to comment.