Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor(backup): move backup rpc handling to replica_backup_server (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and hycdong committed Dec 28, 2020
1 parent a88e9ac commit 3157a86
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 59 deletions.
4 changes: 3 additions & 1 deletion src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ set(DUPLICATION_SRC
)

set(BACKUP_SRC backup/replica_backup_manager.cpp
backup/cold_backup_context.cpp)
backup/cold_backup_context.cpp
backup/replica_backup_server.cpp
)

set(BULK_LOAD_SRC bulk_load/replica_bulk_loader.cpp)

Expand Down
1 change: 1 addition & 0 deletions src/replica/backup/replica_backup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "replica_backup_manager.h"
#include "cold_backup_context.h"
#include "replica/replica.h"

#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/filesystem.h>
Expand Down
4 changes: 3 additions & 1 deletion src/replica/backup/replica_backup_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

#pragma once

#include "replica/replica.h"
#include <dsn/dist/replication/replica_base.h>
#include <dsn/dist/replication/replication_types.h>

namespace dsn {
namespace replication {

class replica;
class replica_backup_manager : replica_base
{
public:
Expand Down
86 changes: 86 additions & 0 deletions src/replica/backup/replica_backup_server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "replica_backup_server.h"
#include "replica_backup_manager.h"
#include "replica/replica.h"
#include "replica/replica_stub.h"

namespace dsn {
namespace replication {

replica_backup_server::replica_backup_server(const replica_stub *rs) : _stub(rs)
{
dsn_rpc_register_handler(RPC_COLD_BACKUP, "cold_backup", [this](message_ex *msg) {
on_cold_backup(backup_rpc::auto_reply(msg));
});
dsn_rpc_register_handler(RPC_CLEAR_COLD_BACKUP, "clear_cold_backup", [this](message_ex *msg) {
backup_clear_request clear_req;
unmarshall(msg, clear_req);
on_clear_cold_backup(clear_req);
});
}

void replica_backup_server::on_cold_backup(backup_rpc rpc)
{
const backup_request &request = rpc.request();
backup_response &response = rpc.response();

ddebug("received cold backup request: backup{%s.%s.%" PRId64 "}",
request.pid.to_string(),
request.policy.policy_name.c_str(),
request.backup_id);
response.pid = request.pid;
response.policy_name = request.policy.policy_name;
response.backup_id = request.backup_id;

if (_stub->options().cold_backup_root.empty()) {
derror("backup{%s.%s.%" PRId64
"}: cold_backup_root is empty, response ERR_OPERATION_DISABLED",
request.pid.to_string(),
request.policy.policy_name.c_str(),
request.backup_id);
response.err = ERR_OPERATION_DISABLED;
return;
}

replica_ptr rep = _stub->get_replica(request.pid);
if (rep != nullptr) {
rep->on_cold_backup(request, response);
} else {
derror("backup{%s.%s.%" PRId64 "}: replica not found, response ERR_OBJECT_NOT_FOUND",
request.pid.to_string(),
request.policy.policy_name.c_str(),
request.backup_id);
response.err = ERR_OBJECT_NOT_FOUND;
}
}

void replica_backup_server::on_clear_cold_backup(const backup_clear_request &request)
{
ddebug_f("receive clear cold backup request: backup({}.{})",
request.pid.to_string(),
request.policy_name.c_str());

replica_ptr rep = _stub->get_replica(request.pid);
if (rep != nullptr) {
rep->get_backup_manager()->on_clear_cold_backup(request);
}
}

} // namespace replication
} // namespace dsn
46 changes: 46 additions & 0 deletions src/replica/backup/replica_backup_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <dsn/dist/replication/replication_types.h>
#include <dsn/cpp/rpc_holder.h>

namespace dsn {
namespace replication {

class replica_stub;

typedef rpc_holder<backup_request, backup_response> backup_rpc;

// A server distributes the cold-backup task to the targeted replica.
class replica_backup_server
{
public:
explicit replica_backup_server(const replica_stub *rs);

private:
void on_cold_backup(backup_rpc rpc);

void on_clear_cold_backup(const backup_clear_request &request);

private:
const replica_stub *_stub;
};

} // namespace replication
} // namespace dsn
57 changes: 4 additions & 53 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include "mutation.h"
#include "bulk_load/replica_bulk_loader.h"
#include "duplication/duplication_sync_timer.h"
#include "backup/replica_backup_manager.h"
#include "backup/replica_backup_server.h"

#include <dsn/cpp/json_helper.h>
#include <dsn/utility/filesystem.h>
Expand Down Expand Up @@ -758,6 +758,8 @@ void replica_stub::initialize_start()
_duplication_sync_timer->start();
}

_backup_server = dsn::make_unique<replica_backup_server>(this);

// init liveness monitor
dassert(NS_Disconnected == _state, "");
if (_options.fd_disabled == false) {
Expand Down Expand Up @@ -806,7 +808,7 @@ dsn::error_code replica_stub::on_kill_replica(gpid id)
}
}

replica_ptr replica_stub::get_replica(gpid id)
replica_ptr replica_stub::get_replica(gpid id) const
{
zauto_read_lock l(_replicas_lock);
auto it = _replicas.find(id);
Expand Down Expand Up @@ -1050,53 +1052,6 @@ void replica_stub::on_query_app_info(query_app_info_rpc rpc)
}
}

void replica_stub::on_cold_backup(backup_rpc rpc)
{
const backup_request &request = rpc.request();
backup_response &response = rpc.response();

ddebug("received cold backup request: backup{%s.%s.%" PRId64 "}",
request.pid.to_string(),
request.policy.policy_name.c_str(),
request.backup_id);
response.pid = request.pid;
response.policy_name = request.policy.policy_name;
response.backup_id = request.backup_id;

if (_options.cold_backup_root.empty()) {
derror("backup{%s.%s.%" PRId64
"}: cold_backup_root is empty, response ERR_OPERATION_DISABLED",
request.pid.to_string(),
request.policy.policy_name.c_str(),
request.backup_id);
response.err = ERR_OPERATION_DISABLED;
return;
}

replica_ptr rep = get_replica(request.pid);
if (rep != nullptr) {
rep->on_cold_backup(request, response);
} else {
derror("backup{%s.%s.%" PRId64 "}: replica not found, response ERR_OBJECT_NOT_FOUND",
request.pid.to_string(),
request.policy.policy_name.c_str(),
request.backup_id);
response.err = ERR_OBJECT_NOT_FOUND;
}
}

void replica_stub::on_clear_cold_backup(const backup_clear_request &request)
{
ddebug_f("receive clear cold backup request: backup({}.{})",
request.pid.to_string(),
request.policy_name.c_str());

replica_ptr rep = get_replica(request.pid);
if (rep != nullptr) {
rep->get_backup_manager()->on_clear_cold_backup(request);
}
}

void replica_stub::on_prepare(dsn::message_ex *request)
{
gpid id;
Expand Down Expand Up @@ -2136,10 +2091,6 @@ void replica_stub::open_service()
RPC_QUERY_DISK_INFO, "query_disk_info", &replica_stub::on_query_disk_info);
register_rpc_handler_with_rpc_holder(
RPC_QUERY_APP_INFO, "query_app_info", &replica_stub::on_query_app_info);
register_rpc_handler_with_rpc_holder(
RPC_COLD_BACKUP, "cold_backup", &replica_stub::on_cold_backup);
register_rpc_handler(
RPC_CLEAR_COLD_BACKUP, "clear_cold_backup", &replica_stub::on_clear_cold_backup);
register_rpc_handler_with_rpc_holder(RPC_SPLIT_NOTIFY_CATCH_UP,
"child_notify_catch_up",
&replica_stub::on_notify_primary_split_catch_up);
Expand Down
8 changes: 4 additions & 4 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ typedef rpc_holder<query_replica_info_request, query_replica_info_response> quer
typedef rpc_holder<replica_configuration, learn_response> copy_checkpoint_rpc;
typedef rpc_holder<query_disk_info_request, query_disk_info_response> query_disk_info_rpc;
typedef rpc_holder<query_app_info_request, query_app_info_response> query_app_info_rpc;
typedef rpc_holder<backup_request, backup_response> backup_rpc;
typedef rpc_holder<notify_catch_up_request, notify_cacth_up_response> notify_catch_up_rpc;
typedef rpc_holder<group_bulk_load_request, group_bulk_load_response> group_bulk_load_rpc;

Expand All @@ -74,6 +73,7 @@ typedef dsn::ref_ptr<replica_stub> replica_stub_ptr;

class duplication_sync_timer;
class replica_bulk_loader;
class replica_backup_server;
class replica_stub : public serverlet<replica_stub>, public ref_counter
{
public:
Expand Down Expand Up @@ -106,8 +106,6 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
void on_query_replica_info(query_replica_info_rpc rpc);
void on_query_disk_info(query_disk_info_rpc rpc);
void on_query_app_info(query_app_info_rpc rpc);
void on_cold_backup(backup_rpc rpc);
void on_clear_cold_backup(const backup_clear_request &request);
void on_bulk_load(bulk_load_rpc rpc);

//
Expand Down Expand Up @@ -152,8 +150,9 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
//
// common routines for inquiry
//
replica_ptr get_replica(gpid id);
replica_ptr get_replica(gpid id) const;
replication_options &options() { return _options; }
const replication_options &options() const { return _options; }
bool is_connected() const { return NS_Connected == _state; }
virtual rpc_address get_meta_server_address() const { return _failure_detector->get_servers(); }
rpc_address primary_address() const { return _primary_address; }
Expand Down Expand Up @@ -328,6 +327,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
::dsn::task_ptr _mem_release_timer_task;

std::unique_ptr<duplication_sync_timer> _duplication_sync_timer;
std::unique_ptr<replica_backup_server> _backup_server;

// command_handlers
dsn_handle_t _kill_partition_command;
Expand Down

0 comments on commit 3157a86

Please sign in to comment.